This is an automated email from the ASF dual-hosted git repository.

adulceanu pushed a commit to branch issues/OAK-9922
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 625fd1c7716d4db05247fc611ca32e770137e3c8
Author: Lucas Weitzendorf <lweitzend...@adobe.com>
AuthorDate: Fri Oct 14 13:35:51 2022 +0200

    OAK-9922 Parallel Compaction
    Address review comments
---
 .../plugins/index/counter/ApproximateCounter.java  |   9 +-
 .../plugins/index/counter/NodeCounterEditor.java   |   1 +
 .../oak/plugins/index/counter/jmx/NodeCounter.java |   2 +-
 .../index/property/PropertyIndexInfoProvider.java  |   2 +-
 .../strategy/ContentMirrorStoreStrategy.java       |   2 +-
 .../strategy/UniqueEntryStoreStrategy.java         |   2 +-
 .../strategy/ContentMirrorStoreStrategyTest.java   |   2 +-
 .../apache/jackrabbit/oak/run/CompactCommand.java  |   4 +-
 .../oak/segment/aws/tool/AwsCompact.java           |   2 +-
 .../oak/segment/azure/tool/AzureCompact.java       |   2 +-
 oak-segment-tar/pom.xml                            |   2 +
 .../oak/segment/CheckpointCompactor.java           |   4 +-
 .../jackrabbit/oak/segment/ClassicCompactor.java   |  11 +-
 .../jackrabbit/oak/segment/CompactorUtils.java     |  35 +++
 .../oak/segment/DefaultSegmentWriterBuilder.java   |   9 +-
 .../jackrabbit/oak/segment/ParallelCompactor.java  |  19 +-
 .../apache/jackrabbit/oak/segment/RecordCache.java |  32 +--
 .../oak/segment/SegmentBufferWriterPool.java       |   4 +
 .../segment/file/AbstractCompactionStrategy.java   |   8 +-
 .../jackrabbit/oak/segment/file/FileStore.java     |  11 +-
 .../oak/segment/file/FileStoreBuilder.java         |  14 +-
 .../oak/segment/file/GCNodeWriteMonitor.java       |  13 +-
 .../jackrabbit/oak/segment/file/PriorityCache.java | 194 +++++++++--------
 .../jackrabbit/oak/segment/memory/MemoryStore.java |  18 +-
 .../jackrabbit/oak/segment/tool/Compact.java       |   2 +-
 .../segment/AbstractCompactorExternalBlobTest.java |   3 +-
 .../oak/segment/AbstractCompactorTest.java         |   9 +-
 .../jackrabbit/oak/segment/NodeRecordTest.java     |   2 +-
 .../segment/ParallelCompactorExternalBlobTest.java |  20 +-
 .../oak/segment/ParallelCompactorTest.java         |  20 +-
 .../oak/segment/RecordCacheStatsTest.java          |   7 +-
 .../jackrabbit/oak/segment/RecordCacheTest.java    |  54 ++++-
 .../segment/file/ConcurrentPriorityCacheTest.java  | 236 +++++++++++++++++++++
 .../oak/segment/file/PriorityCacheTest.java        |  20 +-
 .../oak/plugins/index}/ApproximateCounter.java     |   2 +-
 .../oak/plugins/index}/ApproximateCounterTest.java |   9 +-
 36 files changed, 581 insertions(+), 205 deletions(-)

diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
index bfad0a2561..a62070a6c8 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
@@ -18,17 +18,18 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.counter;
 
-import java.util.Random;
-import java.util.UUID;
-
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
+import java.util.Random;
+import java.util.UUID;
+
 /**
- * An approximate counter algorithm.
+ * Moved to oak-store-spi
  */
+@Deprecated
 public class ApproximateCounter {
     
     public static final String COUNT_PROPERTY_PREFIX = ":count_";
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
index cde5acec08..6df3524341 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
@@ -22,6 +22,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
 import org.apache.jackrabbit.oak.plugins.index.property.Multiplexers;
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
index fa11e76f27..5815abb350 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
@@ -35,7 +35,7 @@ import 
org.apache.jackrabbit.oak.plugins.index.property.Multiplexers;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 
 /**
  * A mechanism to retrieve node counter data.
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
index 8cde72574a..ff0fe00b56 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexInfo;
 import org.apache.jackrabbit.oak.plugins.index.IndexInfoProvider;
-import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
index 3f2346d93e..aec9ec35fd 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
@@ -29,7 +29,7 @@ import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
-import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditor;
 import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry;
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
index 0ff377d9b4..22909b3849 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
@@ -32,7 +32,7 @@ import org.apache.jackrabbit.oak.spi.query.Filter;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
index d9f8ba5ae3..4d2f779935 100644
--- 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
+++ 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
@@ -26,7 +26,7 @@ import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.KEY_COUNT_P
 import static 
org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditor.COUNT_PROPERTY_NAME;
 import static 
org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditor.DEFAULT_RESOLUTION;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter.COUNT_PROPERTY_PREFIX;
+import static 
org.apache.jackrabbit.oak.plugins.index.ApproximateCounter.COUNT_PROPERTY_PREFIX;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
index bb7f601b7d..bdecf2928a 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
@@ -61,10 +61,10 @@ class CompactCommand implements Command {
                         "the repository into smaller parts and compacts them 
concurrently. If not specified, \"parallel\" compactor is used.")
                 .withRequiredArg().ofType(String.class);
         OptionSpec<Integer> nThreads = parser.accepts("threads", "Specify the 
number of threads used" +
-                "for compaction. This is only applicable to the \"parallel\" 
compactor. Defaults to the number of available processors.")
+                "for compaction. This is only applicable to the \"parallel\" 
compactor. Defaults to 1.")
                 .withRequiredArg()
                 .ofType(Integer.class)
-                .defaultsTo(-1);
+                .defaultsTo(1);
         OptionSpec<String> targetPath = parser.accepts("target-path", 
"Path/URI to TAR/remote segment store where " +
                 "resulting archives will be written")
                 .withRequiredArg()
diff --git 
a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
 
b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
index 2e6e0cb051..1c77b9bab6 100644
--- 
a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
+++ 
b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
@@ -75,7 +75,7 @@ public class AwsCompact {
 
         private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
 
-        private int concurrency = -1;
+        private int concurrency = 1;
 
         private Builder() {
             // Prevent external instantiation.
diff --git 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
index 087cd355d9..a8e103c3c8 100644
--- 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
+++ 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
@@ -81,7 +81,7 @@ public class AzureCompact {
 
         private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
 
-        private int concurrency = -1;
+        private int concurrency = 1;
 
         private String persistentCachePath;
 
diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml
index 39773eb1c1..006bc57b4c 100644
--- a/oak-segment-tar/pom.xml
+++ b/oak-segment-tar/pom.xml
@@ -415,6 +415,8 @@
             <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-core</artifactId>
             <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
index c84aa24eaa..e22d06e962 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
@@ -19,7 +19,6 @@
 package org.apache.jackrabbit.oak.segment;
 
 import static java.util.Objects.requireNonNull;
-
 import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
 import static org.apache.jackrabbit.guava.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.guava.common.collect.Maps.newLinkedHashMap;
@@ -27,7 +26,7 @@ import static 
org.apache.jackrabbit.oak.commons.PathUtils.elements;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.segment.ClassicCompactor.getStableIdBytes;
+import static 
org.apache.jackrabbit.oak.segment.CompactorUtils.getStableIdBytes;
 
 import java.io.IOException;
 import java.util.Date;
@@ -35,6 +34,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
index bb599fccfb..61a2fb67c3 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
@@ -123,15 +123,6 @@ public class ClassicCompactor implements Compactor {
         return new CompactDiff(onto, canceller).diff(before, after);
     }
 
-    @Nullable
-    protected static Buffer getStableIdBytes(@NotNull NodeState state) {
-        if (state instanceof SegmentNodeState) {
-            return ((SegmentNodeState) state).getStableIdBytes();
-        } else {
-            return null;
-        }
-    }
-
     protected SegmentNodeState writeNodeState(NodeState nodeState, Buffer 
stableIdBytes) throws IOException {
         RecordId nodeId = writer.writeNode(nodeState, stableIdBytes);
         compactionMonitor.onNode();
@@ -175,7 +166,7 @@ public class ClassicCompactor implements Compactor {
             } else if (success) {
                 NodeState nodeState = builder.getNodeState();
                 checkState(modCount == 0 || !(nodeState instanceof 
SegmentNodeState));
-                return writeNodeState(nodeState, getStableIdBytes(after));
+                return writeNodeState(nodeState, 
CompactorUtils.getStableIdBytes(after));
             } else {
                 return null;
             }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CompactorUtils.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CompactorUtils.java
new file mode 100644
index 0000000000..ef0bae81c7
--- /dev/null
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CompactorUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+final class CompactorUtils {
+    @Nullable
+    static Buffer getStableIdBytes(@NotNull NodeState state) {
+        if (state instanceof SegmentNodeState) {
+            return ((SegmentNodeState) state).getStableIdBytes();
+        } else {
+            return null;
+        }
+    }
+}
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
index 4d70c23c2c..5097d40680 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
@@ -34,8 +34,8 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Builder for building {@link DefaultSegmentWriter} instances.
- * The returned instances are thread safe if {@link #withWriterPool(PoolType)}
- * was specified and <em>not</em> thread sage if {@link #withoutWriterPool()}
+ * The returned instances are thread-safe if {@link #withWriterPool(PoolType)}
+ * was specified and <em>not</em> thread-safe if {@link #withoutWriterPool()}
  * was specified (default).
  * <p>
  * <em>Default:</em> calling one of the {@code build()} methods without 
previously
@@ -103,6 +103,11 @@ public final class DefaultSegmentWriterBuilder {
         return this;
     }
 
+    @NotNull
+    public DefaultSegmentWriterBuilder withWriterPool() {
+        return withWriterPool(PoolType.GLOBAL);
+    }
+
     /**
      * Create a {@code SegmentWriter} backed by a {@link 
SegmentBufferWriterPool}.
      * The returned instance is thread safe.
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
index 613266706e..5eb050b1c2 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
@@ -19,7 +19,7 @@
 package org.apache.jackrabbit.oak.segment;
 
 import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
 import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
@@ -35,12 +35,16 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.segment.ClassicCompactor.getStableIdBytes;
+import static 
org.apache.jackrabbit.oak.segment.CompactorUtils.getStableIdBytes;
 
 /**
  * This compactor implementation leverages the tree structure of the 
repository for concurrent compaction.
@@ -116,25 +120,18 @@ public class ParallelCompactor extends 
CheckpointCompactor {
     private class CompactionTree implements NodeStateDiff {
         @NotNull
         private final NodeState before;
-
         @NotNull
         private final NodeState after;
-
         @NotNull
         private final NodeState onto;
-
         @NotNull
         private final HashMap<String, CompactionTree> modifiedChildren = new 
HashMap<>();
-
         @NotNull
         private final List<Property> modifiedProperties = new ArrayList<>();
-
         @NotNull
         private final List<String> removedChildNames = new ArrayList<>();
-
         @NotNull
         private final List<String> removedPropertyNames = new ArrayList<>();
-
         /**
          * Stores result of asynchronous compaction.
          */
@@ -364,7 +361,7 @@ public class ParallelCompactor extends CheckpointCompactor {
     ) throws IOException {
         if (numWorkers <= 0) {
             gcListener.info("using sequential compaction.");
-            return compactor.compact(before, after, onto, canceller);
+            return super.compactWithDelegate(before, after, onto, canceller);
         } else if (executorService == null || executorService.isShutdown()) {
             executorService = Executors.newFixedThreadPool(numWorkers);
         }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
index f6effba94f..cdf5fe55f4 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
@@ -19,9 +19,10 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -87,7 +88,7 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
         if (size <= 0) {
             return Empty.emptyFactory();
         } else {
-            return Default.defaultFactory(size, checkNotNull(weigher));
+            return Default.defaultFactory(size, requireNonNull(weigher));
         }
     }
 
@@ -107,7 +108,8 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
     }
 
     private static class Empty<T> extends RecordCache<T> {
-        private final AtomicLong missCount = new AtomicLong();
+        @NotNull
+        private final LongAdder missCount = new LongAdder();
 
         static final <T> Supplier<RecordCache<T>> emptyFactory() {
             return Empty::new;
@@ -115,7 +117,7 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
 
         @Override
         public @NotNull CacheStats getStats() {
-            return new CacheStats(0, missCount.get(), 0, 0, 0, 0);
+            return new CacheStats(0, missCount.sum(), 0, 0, 0, 0);
         }
 
         @Override
@@ -123,7 +125,7 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
 
         @Override
         public RecordId get(@NotNull T key) {
-            missCount.incrementAndGet();
+            missCount.increment();
             return null;
         }
 
@@ -144,34 +146,32 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
 
         @NotNull
         private final Weigher<K, RecordId> weigher;
-
         @NotNull
-        private final AtomicLong weight = new AtomicLong();
-
+        private final LongAdder weight = new LongAdder();
         @NotNull
-        private final AtomicLong loadCount = new AtomicLong();
+        private final LongAdder loadCount = new LongAdder();
 
         @Override
         public @NotNull CacheStats getStats() {
             CacheStats internalStats = cache.stats();
             // any addition to the cache counts as load by our definition
             return new CacheStats(internalStats.hitCount(), 
internalStats.missCount(),
-                    loadCount.get(), 0, 0,  internalStats.evictionCount());
+                    loadCount.sum(), 0, 0,  internalStats.evictionCount());
         }
 
         static <K> Supplier<RecordCache<K>> defaultFactory(final int size, 
@NotNull final Weigher<K, RecordId> weigher) {
-            return () -> new Default<>(size, checkNotNull(weigher));
+            return () -> new Default<>(size, requireNonNull(weigher));
         }
 
         Default(final int size, @NotNull final Weigher<K, RecordId> weigher) {
             this.cache = CacheBuilder.newBuilder()
-                    .maximumSize(size)
+                    .maximumSize(size * 4L / 3)
                     .initialCapacity(size)
                     .concurrencyLevel(4)
                     .recordStats()
                     .removalListener((RemovalListener<K, RecordId>) removal -> 
{
                         int removedWeight = weigher.weigh(removal.getKey(), 
removal.getValue());
-                        weight.addAndGet(-removedWeight);
+                        weight.add(-removedWeight);
                     })
                     .build();
             this.weigher = weigher;
@@ -180,8 +180,8 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
         @Override
         public void put(@NotNull K key, @NotNull RecordId value) {
             cache.put(key, value);
-            loadCount.incrementAndGet();
-            weight.addAndGet(weigher.weigh(key, value));
+            loadCount.increment();
+            weight.add(weigher.weigh(key, value));
         }
 
         @Override
@@ -196,7 +196,7 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
 
         @Override
         public long estimateCurrentWeight() {
-            return weight.get();
+            return weight.sum();
         }
     }
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
index 1ee0361744..984f66ae02 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
@@ -170,6 +170,7 @@ public abstract class SegmentBufferWriterPool implements 
WriteOperationHandler {
             }
         }
 
+        @NotNull
         private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull 
GCGeneration gcGeneration) {
             SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, 
gcGeneration);
             SegmentBufferWriter writer = writers.get(key);
@@ -301,6 +302,7 @@ public abstract class SegmentBufferWriterPool implements 
WriteOperationHandler {
          * a fresh writer at any time. Callers need to return a writer before
          * borrowing it again. Failing to do so leads to undefined behaviour.
          */
+        @NotNull
         private SegmentBufferWriter borrowWriter(@NotNull Object key, @NotNull 
GCGeneration gcGeneration) {
             poolMonitor.enter();
             try {
@@ -340,10 +342,12 @@ public abstract class SegmentBufferWriterPool implements 
WriteOperationHandler {
         return gcGeneration.get();
     }
 
+    @NotNull
     protected SegmentBufferWriter newWriter(@NotNull GCGeneration 
gcGeneration) {
         return new SegmentBufferWriter(idProvider, reader, getWriterId(), 
gcGeneration);
     }
 
+    @NotNull
     protected String getWriterId() {
         if (++writerId > 9999) {
             writerId = 0;
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
index f5f6199b85..cdde3c7918 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
@@ -29,7 +29,13 @@ import static 
org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
 
 import org.apache.jackrabbit.guava.common.base.Function;
 
-import org.apache.jackrabbit.oak.segment.*;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.Compactor;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.ClassicCompactor;
+import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
+import org.apache.jackrabbit.oak.segment.ParallelCompactor;
 import 
org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType;
 import org.apache.jackrabbit.oak.segment.file.cancel.Cancellation;
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
index d7c1a9f430..18375e28e7 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
@@ -39,7 +39,14 @@ import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.guava.common.io.Closer;
 import 
org.apache.jackrabbit.guava.common.util.concurrent.UncheckedExecutionException;
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.*;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
@@ -139,7 +146,7 @@ public class FileStore extends AbstractFileStore {
 
         this.segmentWriter = defaultSegmentWriterBuilder("sys")
                 .withGeneration(() -> getGcGeneration().nonGC())
-                .withWriterPool(SegmentBufferWriterPool.PoolType.GLOBAL)
+                .withWriterPool()
                 .with(builder.getCacheManager()
                         .withAccessTracking("WRITE", statsProvider))
                 .build(this);
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
index 9889a808fd..9cefe70b94 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
@@ -625,21 +625,11 @@ public class FileStoreBuilder {
         }
 
         void evictOldGeneration(final int newGeneration) {
-            evictCaches(new Predicate<Integer>() {
-                @Override
-                public boolean apply(Integer generation) {
-                    return generation < newGeneration;
-                }
-            });
+            evictCaches(generation -> generation < newGeneration);
         }
 
         void evictGeneration(final int newGeneration) {
-            evictCaches(new Predicate<Integer>() {
-                @Override
-                public boolean apply(Integer generation) {
-                    return generation == newGeneration;
-                }
-            });
+            evictCaches(generation -> generation == newGeneration);
         }
     }
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
index 565ff3a49e..4962880ac4 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
@@ -24,6 +24,7 @@ import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.jetbrains.annotations.NotNull;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * Monitors the compaction cycle and keeps a compacted nodes counter, in order
@@ -59,12 +60,12 @@ public class GCNodeWriteMonitor {
     /**
      * Number of compacted properties
      */
-    private final AtomicLong properties = new AtomicLong();
+    private final LongAdder properties = new LongAdder();
 
     /**
      * Number of compacted binaries
      */
-    private final AtomicLong binaries = new AtomicLong();
+    private final LongAdder binaries = new LongAdder();
 
     private volatile boolean running = false;
 
@@ -93,8 +94,8 @@ public class GCNodeWriteMonitor {
             gcMonitor.info("unable to estimate number of nodes for compaction, 
missing gc history.");
         }
         nodes.set(0);
-        properties.set(0);
-        binaries.set(0);
+        properties.reset();
+        binaries.reset();
         start = System.currentTimeMillis();
         running = true;
     }
@@ -108,11 +109,11 @@ public class GCNodeWriteMonitor {
     }
 
     public void onProperty() {
-        properties.incrementAndGet();
+        properties.increment();
     }
 
     public void onBinary() {
-        binaries.incrementAndGet();
+        binaries.increment();
     }
 
     public void finished() {
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
index 35b2de936b..ce4de28e6b 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
@@ -29,6 +29,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.jackrabbit.oak.segment.CacheWeights;
@@ -48,7 +49,7 @@ import org.apache.jackrabbit.guava.common.cache.Weigher;
  * this cache is successfully looked up its cost is incremented by one, unless 
it has reached
  * its maximum cost of {@link Byte#MAX_VALUE} already.
  * <p>
- * Additionally this cache tracks a generation for mappings. Mappings of later 
generations
+ * Additionally, this cache tracks a generation for mappings. Mappings of 
later generations
  * always take precedence over mappings of earlier generations. That is, 
putting a mapping of
  * a later generation into the cache can cause any mapping of an earlier 
generation to be evicted
  * regardless of its cost.
@@ -66,12 +67,12 @@ public class PriorityCache<K, V> {
     private final AtomicInteger[] costs;
     private final AtomicInteger[] evictions;
 
-    private final AtomicLong hitCount = new AtomicLong();
-    private final AtomicLong missCount = new AtomicLong();
-    private final AtomicLong loadCount = new AtomicLong();
-    private final AtomicLong loadExceptionCount = new AtomicLong();
-    private final AtomicLong evictionCount = new AtomicLong();
-    private final AtomicLong size = new AtomicLong();
+    private final LongAdder hitCount = new LongAdder();
+    private final LongAdder missCount = new LongAdder();
+    private final LongAdder loadCount = new LongAdder();
+    private final LongAdder loadExceptionCount = new LongAdder();
+    private final LongAdder evictionCount = new LongAdder();
+    private final LongAdder size = new LongAdder();
 
     private static class Segment extends ReentrantLock {}
 
@@ -143,7 +144,7 @@ public class PriorityCache<K, V> {
      *                  smaller than {@code 32 - numberOfTrailingZeros(size)}.
      */
     PriorityCache(int size, int rehash) {
-        this(size, rehash, CacheWeights.<K, V> noopWeigher());
+        this(size, rehash, CacheWeights.noopWeigher());
     }
 
     /**
@@ -165,7 +166,9 @@ public class PriorityCache<K, V> {
      * @param rehash      Number of rehashes. Must be greater or equal to 
{@code 0} and
      *                    smaller than {@code 32 - 
numberOfTrailingZeros(size)}.
      * @param weigher     Needed to provide an estimation of the cache weight 
in memory
-     * @param numSegments Number of separately locked segments
+     * @param numSegments Number of separately locked segments. The 
implementation assumes an equal
+     *                    number of entries in each segment, requiring 
numSegments to divide size.
+     *                    Powers of 2 are a safe choice, see @param size.
      */
     public PriorityCache(int size, int rehash, @NotNull Weigher<K, V> weigher, 
int numSegments) {
         checkArgument(bitCount(size) == 1);
@@ -177,7 +180,9 @@ public class PriorityCache<K, V> {
         this.weigher = requireNonNull(weigher);
 
         numSegments = Math.min(numSegments, size);
-        checkArgument((size % numSegments) == 0);
+        checkArgument((size % numSegments) == 0,
+                "Cache size is not a multiple of its segment count.");
+
         segments = new Segment[numSegments];
         for (int s = 0; s < numSegments; s++) {
             segments[s] = new Segment();
@@ -217,7 +222,7 @@ public class PriorityCache<K, V> {
      * @return  the number of mappings in this cache.
      */
     public long size() {
-        return size.get();
+        return size.sum();
     }
 
     /**
@@ -231,74 +236,80 @@ public class PriorityCache<K, V> {
     public boolean put(@NotNull K key, @NotNull V value, int generation, byte 
initialCost) {
         int hashCode = key.hashCode();
         byte cheapest = initialCost;
-        int index;
-        boolean eviction;
+        int index = -1;
+        boolean eviction = false;
 
         Segment lockedSegment = null;
 
-        for (int k = 0; k <= rehash; k++) {
-            int i = project(hashCode, k);
-            Segment segment = getSegment(i);
-            if (segment != lockedSegment) {
-                if (lockedSegment != null) {
-                    lockedSegment.unlock();
+        try {
+            for (int k = 0; k <= rehash; k++) {
+                int i = project(hashCode, k);
+                Segment segment = getSegment(i);
+                if (segment != lockedSegment) {
+                    if (lockedSegment != null) {
+                        lockedSegment.unlock();
+                    }
+                    lockedSegment = segment;
+                    lockedSegment.lock();
                 }
-                lockedSegment = segment;
-                lockedSegment.lock();
-            }
 
-            Entry<?, ?> entry = entries[i];
-            if (entry == Entry.NULL) {
-                // Empty slot -> use this index
-                index = i;
-                eviction = false;
-            } else if (entry.generation <= generation && 
key.equals(entry.key)) {
-                // Key exists and generation is greater or equal -> use this 
index and boost the cost
-                index = i;
-                initialCost = entry.cost;
-                if (initialCost < Byte.MAX_VALUE) {
-                    initialCost++;
-                }
-                eviction = false;
-            } else if (entry.generation < generation) {
-                // Old generation -> use this index
-                index = i;
-                eviction = false;
-            } else if (entry.cost < cheapest) {
-                // Candidate slot, keep on searching for even cheaper slots
-                cheapest = entry.cost;
-                index = i;
-                eviction = true;
-                if (k < rehash) {
-                    continue;
+                Entry<?, ?> entry = entries[i];
+                if (entry == Entry.NULL) {
+                    // Empty slot -> use this index
+                    index = i;
+                    eviction = false;
+                    break;
+                } else if (entry.generation <= generation && 
key.equals(entry.key)) {
+                    // Key exists and generation is greater or equal -> use 
this index and boost the cost
+                    index = i;
+                    initialCost = entry.cost;
+                    if (initialCost < Byte.MAX_VALUE) {
+                        initialCost++;
+                    }
+                    eviction = false;
+                    break;
+                } else if (entry.generation < generation) {
+                    // Old generation -> use this index
+                    index = i;
+                    eviction = false;
+                    break;
+                } else if (entry.cost < cheapest) {
+                    // Candidate slot, keep on searching for even cheaper slots
+                    cheapest = entry.cost;
+                    index = i;
+                    eviction = true;
                 }
-            } else {
-                continue;
             }
 
-            Entry<?, ?> old = entries[index];
-            Entry<?, ?> newE = new Entry<>(key, value, generation, 
initialCost);
-            entries[index] = newE;
-            loadCount.incrementAndGet();
-            costs[initialCost - Byte.MIN_VALUE].incrementAndGet();
-            if (old != Entry.NULL) {
-                costs[old.cost - Byte.MIN_VALUE].decrementAndGet();
-                if (eviction) {
-                    evictions[old.cost - Byte.MIN_VALUE].incrementAndGet();
-                    evictionCount.incrementAndGet();
+            if (index >= 0) {
+                Entry<?, ?> oldEntry = entries[index];
+                Entry<?, ?> newEntry = new Entry<>(key, value, generation, 
initialCost);
+                entries[index] = newEntry;
+                loadCount.increment();
+                costs[initialCost - Byte.MIN_VALUE].incrementAndGet();
+
+                if (oldEntry != Entry.NULL) {
+                    costs[oldEntry.cost - Byte.MIN_VALUE].decrementAndGet();
+                    if (eviction) {
+                        evictions[oldEntry.cost - 
Byte.MIN_VALUE].incrementAndGet();
+                        evictionCount.increment();
+                    }
+                    weight.addAndGet(-weighEntry(oldEntry));
+                } else {
+                    size.increment();
                 }
-                weight.addAndGet(-weighEntry(old)) ;
-            } else {
-                size.incrementAndGet();
+
+                weight.addAndGet(weighEntry(newEntry));
+                return true;
             }
-            weight.addAndGet(weighEntry(newE));
-            lockedSegment.unlock();
-            return true;
-        }
 
-        requireNonNull(lockedSegment).unlock();
-        loadExceptionCount.incrementAndGet();
-        return false;
+            loadExceptionCount.increment();
+            return false;
+        } finally {
+            if (lockedSegment != null) {
+                lockedSegment.unlock();
+            }
+        }
     }
 
     /**
@@ -316,21 +327,23 @@ public class PriorityCache<K, V> {
             int i = project(hashCode, k);
             Segment segment = getSegment(i);
             segment.lock();
-            Entry<?, ?> entry = entries[i];
-            if (generation == entry.generation && key.equals(entry.key)) {
-                if (entry.cost < Byte.MAX_VALUE) {
-                    costs[entry.cost - Byte.MIN_VALUE].decrementAndGet();
-                    entry.cost++;
-                    costs[entry.cost - Byte.MIN_VALUE].incrementAndGet();
+
+            try {
+                Entry<?, ?> entry = entries[i];
+                if (generation == entry.generation && key.equals(entry.key)) {
+                    if (entry.cost < Byte.MAX_VALUE) {
+                        costs[entry.cost - Byte.MIN_VALUE].decrementAndGet();
+                        entry.cost++;
+                        costs[entry.cost - Byte.MIN_VALUE].incrementAndGet();
+                    }
+                    hitCount.increment();
+                    return (V) entry.value;
                 }
-                hitCount.incrementAndGet();
-                V value = (V) entry.value;
+            } finally {
                 segment.unlock();
-                return value;
             }
-            segment.unlock();
         }
-        missCount.incrementAndGet();
+        missCount.increment();
         return null;
     }
 
@@ -344,16 +357,19 @@ public class PriorityCache<K, V> {
         int entriesPerSegment = entries.length / numSegments;
         for (int s = 0; s < numSegments; s++) {
             segments[s].lock();
-            for (int i = 0; i < entriesPerSegment; i++) {
-                int j = i + s * entriesPerSegment;
-                Entry<?, ?> entry = entries[j];
-                if (entry != Entry.NULL && purge.apply(entry.generation)) {
-                    entries[j] = Entry.NULL;
-                    size.decrementAndGet();
-                    weight.addAndGet(-weighEntry(entry));
+            try {
+                for (int i = 0; i < entriesPerSegment; i++) {
+                    int j = i + s * entriesPerSegment;
+                    Entry<?, ?> entry = entries[j];
+                    if (entry != Entry.NULL && purge.apply(entry.generation)) {
+                        entries[j] = Entry.NULL;
+                        size.decrement();
+                        weight.addAndGet(-weighEntry(entry));
+                    }
                 }
+            } finally {
+                segments[s].unlock();
             }
-            segments[s].unlock();
         }
     }
 
@@ -386,8 +402,8 @@ public class PriorityCache<K, V> {
      */
     @NotNull
     public CacheStats getStats() {
-        return new CacheStats(hitCount.get(), missCount.get(), loadCount.get(),
-                loadExceptionCount.get(), 0, evictionCount.get());
+        return new CacheStats(hitCount.sum(), missCount.sum(), loadCount.sum(),
+                loadExceptionCount.sum(), 0, evictionCount.sum());
     }
 
     public long estimateCurrentWeight() {
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
index 916f4cab08..ddda4676a0 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
@@ -27,9 +27,19 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.jackrabbit.guava.common.collect.Maps;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.*;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.SegmentReader;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.SegmentTracker;
+import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
+import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
+import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.stats.NoopStats;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -62,9 +72,7 @@ public class MemoryStore implements SegmentStore {
         });
         this.revisions = new MemoryStoreRevisions();
         this.segmentReader = new CachingSegmentReader(this::getWriter, null, 
16, 2, NoopStats.INSTANCE);
-        this.segmentWriter = defaultSegmentWriterBuilder("sys")
-                .withWriterPool(SegmentBufferWriterPool.PoolType.GLOBAL)
-                .build(this);
+        this.segmentWriter = 
defaultSegmentWriterBuilder("sys").withWriterPool().build(this);
         revisions.bind(this);
         segmentWriter.flush();
     }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
index bda82108ed..a72360ddb0 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
@@ -80,7 +80,7 @@ public class Compact {
 
         private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
 
-        private int concurrency = -1;
+        private int concurrency = 1;
 
         private Builder() {
             // Prevent external instantiation.
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
index 9897b947ff..8ae3f8453d 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
@@ -28,7 +28,8 @@ import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.createBlob;
 import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.getCheckpoint;
 import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static 
org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
index 8ce843c0da..77589480ef 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
@@ -27,10 +27,7 @@ import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.checkGenerati
 import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.getCheckpoint;
 import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static 
org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -83,7 +80,7 @@ public abstract class AbstractCompactorTest {
         SegmentNodeState uncompacted1 = fileStore.getHead();
         SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, 
uncompacted1, EMPTY_NODE, Canceller.newCanceller());
         assertNotNull(compacted1);
-        assertFalse(uncompacted1 == compacted1);
+        assertNotSame(uncompacted1, compacted1);
         checkGeneration(compacted1, compactedGeneration);
 
         assertSameStableId(uncompacted1, compacted1);
@@ -100,7 +97,7 @@ public abstract class AbstractCompactorTest {
         SegmentNodeState uncompacted2 = fileStore.getHead();
         SegmentNodeState compacted2 = compactor.compact(uncompacted1, 
uncompacted2, compacted1, Canceller.newCanceller());
         assertNotNull(compacted2);
-        assertFalse(uncompacted2 == compacted2);
+        assertNotSame(uncompacted2, compacted2);
         checkGeneration(compacted2, compactedGeneration);
 
         
assertTrue(fileStore.getRevisions().setHead(uncompacted2.getRecordId(), 
compacted2.getRecordId()));
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
index 5ea1f24d79..ae6856a857 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
@@ -105,7 +105,7 @@ public class NodeRecordTest {
 
             SegmentWriter writer = defaultSegmentWriterBuilder("test")
                     .withGeneration(generation)
-                    .withWriterPool(SegmentBufferWriterPool.PoolType.GLOBAL)
+                    .withWriterPool()
                     .with(nodesOnlyCache())
                     .build(store);
 
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
index 661a037876..ae8b817525 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
@@ -23,10 +23,28 @@ import 
org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.jetbrains.annotations.NotNull;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
 
 import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
 
+@RunWith(Parameterized.class)
 public class ParallelCompactorExternalBlobTest extends 
AbstractCompactorExternalBlobTest {
+
+    private final int concurrency;
+
+    @Parameterized.Parameters
+    public static List<Integer> concurrencyLevels() {
+        return Arrays.asList(1, 2, 4, 8, 16);
+    }
+
+    public ParallelCompactorExternalBlobTest(int concurrency) {
+        this.concurrency = concurrency;
+    }
+
     @Override
     protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
         SegmentWriter writer = defaultSegmentWriterBuilder("c")
@@ -40,6 +58,6 @@ public class ParallelCompactorExternalBlobTest extends 
AbstractCompactorExternal
                 writer,
                 fileStore.getBlobStore(),
                 GCNodeWriteMonitor.EMPTY,
-                4);
+                concurrency);
     }
 }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
index 627a3acf4a..27bebb1441 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
@@ -23,10 +23,28 @@ import 
org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.jetbrains.annotations.NotNull;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
 
 import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
 
+@RunWith(Parameterized.class)
 public class ParallelCompactorTest extends AbstractCompactorTest {
+
+    private final int concurrency;
+
+    @Parameterized.Parameters
+    public static List<Integer> concurrencyLevels() {
+        return Arrays.asList(1, 2, 4, 8, 16);
+    }
+
+    public ParallelCompactorTest(int concurrency) {
+        this.concurrency = concurrency;
+    }
+
     @Override
     protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
         SegmentWriter writer = defaultSegmentWriterBuilder("c")
@@ -40,6 +58,6 @@ public class ParallelCompactorTest extends 
AbstractCompactorTest {
                 writer,
                 fileStore.getBlobStore(),
                 GCNodeWriteMonitor.EMPTY,
-                4);
+                concurrency);
     }
 }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
index e84dd1582f..1d7a9c322e 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
@@ -25,12 +25,11 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.jackrabbit.guava.common.cache.CacheStats;
+import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.jackrabbit.guava.common.base.Supplier;
-
 public class RecordCacheStatsTest {
     private static final String NAME = "cache stats";
     private static final int KEYS = 100;
@@ -38,7 +37,7 @@ public class RecordCacheStatsTest {
     private final Random rnd = new Random();
     private final MemoryStore store = new MemoryStore();
 
-    private final RecordCache<Integer> cache = newRecordCache(KEYS * 4 / 3);
+    private final RecordCache<Integer> cache = newRecordCache(KEYS);
     private final RecordCacheStats cacheStats =
             new RecordCacheStats(NAME, cache::getStats, cache::size, 
cache::estimateCurrentWeight);
 
@@ -56,7 +55,7 @@ public class RecordCacheStatsTest {
             cache.put(k, newRecordId());
         }
 
-        for (int k = 0; k < 100; k++) {
+        for (int k = 0; k < KEYS; k++) {
             if (cache.get(4 * k) != null) {
                 hits++;
             }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
index e697f70836..fcc9c31966 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
@@ -26,9 +26,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.junit.Test;
@@ -59,11 +67,53 @@ public class RecordCacheTest {
         assertEquals(value, cache.get("key"));
     }
 
+    @Test
+    public void concurrentPutAndGet() throws ExecutionException, 
InterruptedException {
+        final int SIZE = 16384;
+
+        RecordCache<String> cache = newRecordCache(SIZE);
+        HashMap<String, RecordId> values = new HashMap<>(SIZE);
+        List<Integer> indices = new ArrayList<>(SIZE);
+
+        for (int k = 0; k < SIZE; k ++) {
+            String key = "key-" + k;
+            RecordId value = newRecordId(idProvider, rnd);
+            values.put(key, value);
+            indices.add(k);
+        }
+
+        Collections.shuffle(indices);
+        ExecutorService executor = Executors.newFixedThreadPool(16);
+        List<Future<String>> putFutures = new ArrayList<>(SIZE);
+        List<Future<Void>> getFutures = new ArrayList<>(SIZE);
+
+        for (int k = 0; k < SIZE; k ++) {
+            int idx = k;
+            putFutures.add(executor.submit(() -> {
+                String key = "key-" + idx;
+                cache.put(key, values.get(key));
+                return key;
+            }));
+        }
+
+        for (Future<String> future : putFutures) {
+            getFutures.add(executor.submit(() -> {
+                String key = future.get();
+                assertEquals(values.get(key), cache.get(key));
+                return null;
+            }));
+        }
+
+        for (Future<Void> future : getFutures) {
+            future.get();
+        }
+    }
+
     @Test
     public void invalidate() {
-        RecordCache<String> cache = newRecordCache(10);
+        RecordCache<String> cache = newRecordCache(100);
         Map<String, RecordId> keys = newLinkedHashMap();
-        for (int k = 0; k < 10; k ++) {
+        for (int k = 0; k < 100; k ++) {
             String key = "key-" + k;
             RecordId value = newRecordId(idProvider, rnd);
             keys.put(key, value);
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ConcurrentPriorityCacheTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ConcurrentPriorityCacheTest.java
new file mode 100644
index 0000000000..cab79e0edf
--- /dev/null
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ConcurrentPriorityCacheTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import org.apache.jackrabbit.oak.segment.CacheWeights;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static java.lang.Integer.valueOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeTrue;
+
+@RunWith(Parameterized.class)
+public class ConcurrentPriorityCacheTest {
+
+    private final int concurrency;
+
+    @Parameterized.Parameters
+    public static List<Integer> concurrencyLevels() {
+        return Arrays.asList(1, 2, 4, 8, 16, 32);
+    }
+
+    public ConcurrentPriorityCacheTest(int concurrency) {
+        this.concurrency = concurrency;
+    }
+
+    @Test
+    public void concurrentReadWrite() throws ExecutionException, 
InterruptedException {
+        final int SIZE = 16384;
+        PriorityCache<String, Integer> cache = new PriorityCache<>(SIZE, 10);
+
+        ExecutorService executor1 = Executors.newFixedThreadPool(concurrency);
+        ExecutorService executor2 = Executors.newFixedThreadPool(concurrency);
+
+        List<Future<Boolean>> putFutures = new ArrayList<>(SIZE);
+        List<Future<Void>> getFutures = new ArrayList<>(SIZE);
+
+        for (int k = 0; k < SIZE; k++) {
+            int idx = k;
+            putFutures.add(executor1.submit(
+                    () -> cache.put("key-" + idx, idx, 0, (byte) 0)));
+        }
+
+        for (int k = 0; k < SIZE; k++) {
+            int idx = k;
+            getFutures.add(executor2.submit(() -> {
+                if (putFutures.get(idx).get()) {
+                    assertEquals(valueOf(idx), cache.get("key-" + idx, 0));
+                    assertNull(cache.get("key-" + idx, 1));
+                } else {
+                    assertNull(cache.get("key-" + idx, 0));
+                }
+                return null;
+            }));
+        }
+
+        for (Future<Void> future : getFutures) {
+            future.get();
+        }
+    }
+
+    @Test
+    public void concurrentUpdateKey() throws ExecutionException, 
InterruptedException {
+        PriorityCache<String, Byte> cache = new PriorityCache<>(1, 5);
+
+        List<Byte> costs = new ArrayList<>(Byte.MAX_VALUE + 1);
+        for (int k = 0; k <= Byte.MAX_VALUE; k++) {
+            costs.add((byte) k);
+        }
+        Collections.shuffle(costs);
+
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<Future<Boolean>> futures = new ArrayList<>(Byte.MAX_VALUE + 1);
+
+        for (int k = 0; k <= Byte.MAX_VALUE; k++) {
+            byte cost = costs.get(k);
+            futures.add(executor.submit(() -> cache.put("key-" + cost, cost, 
0, cost)));
+        }
+
+        for (Future<Boolean> future : futures) {
+            future.get();
+        }
+
+        assertEquals( Byte.valueOf(Byte.MAX_VALUE), cache.get("key-" + 
Byte.MAX_VALUE, 0));
+    }
+
+    @Test
+    public void concurrentUpdateWithNewGeneration() throws ExecutionException, 
InterruptedException {
+        final int NUM_GENERATIONS = 256;
+        PriorityCache<String, Integer> cache = new PriorityCache<>(1, 5);
+
+        List<Integer> generations = new ArrayList<>(NUM_GENERATIONS);
+        for (int k = 0; k < NUM_GENERATIONS; k++) {
+            generations.add(k);
+        }
+        Collections.shuffle(generations);
+
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<Future<Boolean>> futures = new ArrayList<>(NUM_GENERATIONS);
+
+        for (int k = 0; k < NUM_GENERATIONS; k++) {
+            int gen = generations.get(k);
+            futures.add(executor.submit(() -> cache.put("key", gen, gen, 
(byte) 0)));
+        }
+
+        for (Future<Boolean> future : futures) {
+            future.get();
+        }
+
+        assertEquals(Integer.valueOf(NUM_GENERATIONS - 1), cache.get("key", 
NUM_GENERATIONS-1));
+    }
+
+    @Test
+    public void concurrentGenerationPurge() throws ExecutionException, 
InterruptedException {
+        PriorityCache<String, Integer> cache = new PriorityCache<>(65536);
+
+        for (int gen = 4; gen >= 0; gen--) {
+            // Backward iteration avoids earlier generations are replaced with 
later ones
+            for (int k = 0; k < 100; k++) {
+                assumeTrue("All test keys are in the cache",
+                        cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0));
+            }
+        }
+
+        assertEquals(500, cache.size());
+
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<Future<Void>> futures = new ArrayList<>(concurrency * 4);
+
+        for (int i = 0; i < concurrency; i++) {
+            futures.add(executor.submit(() -> {
+                cache.purgeGenerations(generation -> generation == 1);
+                return null;
+            }));
+            futures.add(executor.submit(() -> {
+                cache.purgeGenerations(generation -> generation == 4);
+                return null;
+            }));
+            futures.add(executor.submit(() -> {
+                cache.purgeGenerations(generation -> generation <= 1);
+                return null;
+            }));
+        }
+
+        for (Future<Void> future : futures) {
+            future.get();
+        }
+
+        assertEquals(200, cache.size());
+    }
+
+    @Test
+    public void concurrentEvictionCount() throws ExecutionException, 
InterruptedException {
+        Random rnd = new Random();
+        PriorityCache<String, Integer> cache = new PriorityCache<>(128, 2, 
CacheWeights.noopWeigher());
+
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<Future<Boolean>> futures = new ArrayList<>(256);
+
+        for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; b++) {
+            int value = b;
+            futures.add(executor.submit(() ->
+                    cache.put("k-" + value + "-" + rnd.nextInt(1000), value, 
0, (byte) value)));
+        }
+
+        int count = 0;
+        for (Future<Boolean> future : futures) {
+            if (future.get()) {
+                count++;
+            }
+        }
+
+        assertEquals(count, cache.size() + cache.getStats().evictionCount());
+    }
+
+    @Test
+    public void concurrentLoadExceptionCount() throws ExecutionException, 
InterruptedException {
+        Random rnd = new Random();
+        PriorityCache<String, Integer> cache = new PriorityCache<>(16);
+
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<Future<Boolean>> futures = new ArrayList<>(1000);
+
+        for (int i = 0; i < 1000; i++) {
+            int value = i;
+            futures.add(executor.submit(() ->
+                    cache.put("k-" + value + "-" + rnd.nextInt(1000), value, 
0, (byte) 0)));
+        }
+
+        int success = 0;
+        int failure = 0;
+
+        for (Future<Boolean> future : futures) {
+            if (future.get()) {
+                success++;
+            } else {
+                failure++;
+            }
+        }
+
+        assertEquals(0, cache.getStats().evictionCount());
+        assertEquals(success, cache.size());
+        assertEquals(failure, cache.getStats().loadExceptionCount());
+    }
+
+}
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
index 3d2be39e04..2ea349f1ef 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
@@ -81,7 +81,7 @@ public class PriorityCacheTest {
 
     @Test
     public void readWrite() {
-        PriorityCache<String, Integer> cache = new PriorityCache<String, 
Integer>(128, 0);
+        PriorityCache<String, Integer> cache = new PriorityCache<>(128, 0);
         for (int k = 0; k < 128; k++) {
             if (cache.put("key-" + k, k, 0, (byte) 0)) {
                 assertEquals(Integer.valueOf(k), cache.get("key-" + k, 0));
@@ -101,7 +101,7 @@ public class PriorityCacheTest {
 
     @Test
     public void updateKey() {
-        PriorityCache<String, Integer> cache = new PriorityCache<String, 
Integer>(1, 0);
+        PriorityCache<String, Integer> cache = new PriorityCache<>(1, 0);
 
         assertTrue(cache.put("one", 1, 0, (byte) 0));
 
@@ -118,7 +118,7 @@ public class PriorityCacheTest {
 
     @Test
     public void updateWithNewGeneration() {
-        PriorityCache<String, Integer> cache = new PriorityCache<String, 
Integer>(1, 0);
+        PriorityCache<String, Integer> cache = new PriorityCache<>(1, 0);
         assertTrue(cache.put("one", 1, 0, (byte) 0));
 
         // Cache is full but we can still put a key of a higher generation
@@ -134,24 +134,18 @@ public class PriorityCacheTest {
 
     @Test
     public void generationPurge() {
-        PriorityCache<String, Integer> cache = new PriorityCache<String, 
Integer>(65536);
+        PriorityCache<String, Integer> cache = new PriorityCache<>(65536);
 
         for (int gen = 4; gen >= 0; gen--) {
             // Backward iteration avoids earlier generations are replaced with 
later ones
             for (int k = 0; k < 100; k++) {
-                if (!cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0)) {
-                    assumeTrue("All test keys are in the cache", false);
-                }
+                assumeTrue("All test keys are in the cache",
+                        cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0));
             }
         }
 
         assertEquals(500, cache.size());
-        cache.purgeGenerations(new Predicate<Integer>() {
-            @Override
-            public boolean apply(Integer generation) {
-                return generation <= 2;
-            }
-        });
+        cache.purgeGenerations(generation -> generation <= 2);
         assertEquals(200, cache.size());
     }
 
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
 
b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounter.java
similarity index 99%
copy from 
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
copy to 
oak-store-spi/src/main/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounter.java
index bfad0a2561..a1006eeab9 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
+++ 
b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.jackrabbit.oak.plugins.index.counter;
+package org.apache.jackrabbit.oak.plugins.index;
 
 import java.util.Random;
 import java.util.UUID;
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounterTest.java
 
b/oak-store-spi/src/test/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounterTest.java
similarity index 96%
rename from 
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounterTest.java
rename to 
oak-store-spi/src/test/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounterTest.java
index 098dd52b2e..d4fe3b0375 100644
--- 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounterTest.java
+++ 
b/oak-store-spi/src/test/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounterTest.java
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.jackrabbit.oak.plugins.index.counter;
+package org.apache.jackrabbit.oak.plugins.index;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.junit.Test;
 
 import java.util.Random;
 
-import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ApproximateCounterTest {
 

Reply via email to