This is an automated email from the ASF dual-hosted git repository.

adulceanu pushed a commit to branch issues/OAK-9922
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 4ec09454fe88515bafd896e4e973e210fdb9cd25
Author: Lucas Weitzendorf <lweitzend...@adobe.com>
AuthorDate: Wed Sep 7 13:11:32 2022 +0200

    OAK-9922 Parallel Compaction
---
 .../site/markdown/nodestore/segment/overview.md    |  27 +-
 .../apache/jackrabbit/oak/run/CompactCommand.java  |  52 +--
 .../oak/segment/aws/tool/AwsCompact.java           |  34 +-
 .../oak/segment/aws/tool/AwsToolUtils.java         |  19 +-
 .../oak/segment/azure/tool/AzureCompact.java       |  21 +-
 .../oak/segment/azure/tool/ToolUtils.java          |  26 +-
 oak-segment-tar/pom.xml                            |   2 -
 .../oak/segment/CheckpointCompactor.java           |  32 +-
 .../jackrabbit/oak/segment/ClassicCompactor.java   |  18 +-
 .../jackrabbit/oak/segment/ParallelCompactor.java  | 373 +++++++++++++++++++++
 .../apache/jackrabbit/oak/segment/RecordCache.java | 114 +++----
 .../oak/segment/SegmentBufferWriterPool.java       | 178 +++-------
 .../oak/segment/compaction/SegmentGCOptions.java   |  47 ++-
 .../segment/file/AbstractCompactionStrategy.java   |  31 +-
 .../jackrabbit/oak/segment/file/FileStore.java     |   2 +-
 .../oak/segment/file/GCNodeWriteMonitor.java       |  65 ++--
 .../jackrabbit/oak/segment/file/PriorityCache.java | 196 +++++++----
 .../jackrabbit/oak/segment/tool/Compact.java       |  22 +-
 ...java => AbstractCompactorExternalBlobTest.java} |  30 +-
 ...mpactorTest.java => AbstractCompactorTest.java} |  18 +-
 .../CheckpointCompactorExternalBlobTest.java       | 135 +-------
 .../oak/segment/CheckpointCompactorTest.java       | 111 ++----
 .../oak/segment/CompactionAndCleanupIT.java        |  13 +-
 ...actorTestUtils.java => CompactorTestUtils.java} |  20 +-
 .../segment/ParallelCompactorExternalBlobTest.java |  45 +++
 .../oak/segment/ParallelCompactorTest.java         |  45 +++
 .../oak/segment/RecordCacheStatsTest.java          |  13 +-
 .../oak/segment/SegmentBufferWriterPoolTest.java   |  42 +--
 28 files changed, 1053 insertions(+), 678 deletions(-)

diff --git a/oak-doc/src/site/markdown/nodestore/segment/overview.md 
b/oak-doc/src/site/markdown/nodestore/segment/overview.md
index 7a02190b16..53f10542ab 100644
--- a/oak-doc/src/site/markdown/nodestore/segment/overview.md
+++ b/oak-doc/src/site/markdown/nodestore/segment/overview.md
@@ -288,6 +288,22 @@ TarMK GC #2: compacting 
checkpoints/5c45ca7b-5863-4679-a7c5-6056a999a6cd/root.
 TarMK GC #2: compacting root.
 ```
 
+##### <a name="how-does-compaction-make-use-of-multithreading"/> How does 
compaction make use of multithreading?
+
+The parallel compactor adds an initial exploration phase to the compaction 
process, which scans and splits the content tree
+into multiple parts to be processed simultaneously. For this to be efficient, 
the tree is only expanded until a certain 
+number of nodes is reached, which is defined relative to the number of threads 
(main thread + compaction workers).
+
+```
+TarMK GC #2: compacting with 8 threads.
+TarMK GC #2: exploring content tree to find subtrees for parallel compaction.
+TarMK GC #2: target node count for expansion is 7000, based on 7 available 
workers.
+TarMK GC #2: Found 3 nodes at depth 1, target is 7000.
+TarMK GC #2: Found 48 nodes at depth 2, target is 7000.
+TarMK GC #2: Found 663 nodes at depth 3, target is 7000.
+TarMK GC #2: Found 66944 nodes at depth 4, target is 7000.
+```
+
 ##### <a name="how-does-compaction-works-with-concurrent-writes"/> How does 
compaction work with concurrent writes?
 
 When compaction runs as part of online garbage collection, it has to work 
concurrently with the rest of the system.
@@ -807,24 +823,25 @@ This option is optional and is disabled by default.
 ### <a name="compact"/> Compact
 
 ```
-java -jar oak-run.jar compact [--force] [--mmap] [--compactor] SOURCE 
[--target-path DESTINATION] [--persistent-cache-path PERSISTENT_CACHE_PATH] 
[--persistent-cache-size-gb <PERSISTENT_CACHE_SIZE_GB>]
+java -jar oak-run.jar compact [--force] [--mmap] [--compactor] [--threads] 
SOURCE [--target-path DESTINATION] [--persistent-cache-path 
PERSISTENT_CACHE_PATH] [--persistent-cache-size-gb <PERSISTENT_CACHE_SIZE_GB>]
 ```
 
 The `compact` command performs offline compaction of the local/remote Segment 
Store at `SOURCE`. 
 `SOURCE` must be a valid path/uri to an existing Segment Store. Currently, 
Azure Segment Store and AWS Segment Store the supported remote Segment Stores. 
 Please refer to the [Remote Segment Stores](#remote-segment-stores) section 
for details on how to correctly specify connection URIs.
 
-If the optional `--force [Boolean]` argument is set to `true` the tool ignores 
a non 
-matching Segment Store version. *CAUTION*: this will upgrade the Segment Store 
to the 
+If the optional `--force [Boolean]` argument is set to `true` the tool ignores 
a non-matching Segment Store version. *CAUTION*: this will upgrade the Segment 
Store to the 
 latest version, which is incompatible with older versions. *There is no way to 
downgrade 
 an accidentally upgraded Segment Store*.  
 
 The optional `--mmap [Boolean]` argument can be used to control the file 
access mode. Set
 to `true` for memory mapped access and `false` for file access. If not 
specified, memory 
-mapped access is used on 64 bit systems and file access is used on 32 bit 
systems. On
+mapped access is used on 64-bit systems and file access is used on 32-bit 
systems. On
 Windows, regular file access is always enforced and this option is ignored.
 
-The optional `--compactor [String]` argument can be used to pick the compactor 
type to be used. Valid choices are *classic* and *diff*. While the former is 
slower, it might be more stable, due to lack of optimisations employed by the 
*diff* compactor which compacts the checkpoints on top of each other. If not 
specified, *diff* compactor is used.
+The optional `--compactor [String]` argument can be used to pick the compactor 
type to be used. Valid choices are *classic*, *diff* and *parallel*. While 
*classic* is slower, it might be more stable, due to lack of optimisations 
employed by the *diff* compactor which compacts the checkpoints on top of each 
other and the *parallel* compactor, which additionally divides the repository 
into multiple parts to process in parallel. If not specified, *parallel* 
compactor is used.
+
+The optional `--threads [Integer]` argument specifies the number of threads to 
use for compaction. This is only applicable to the *parallel* compactor. If not 
specified, this defaults to the number of available processors.
 
 In order to speed up offline compaction for remote Segment Stores, three new 
options were introduced for configuring the destination segment store where 
compacted archives will be written and also to configure a persistent disk 
cache for speeding up segments reading during compaction. All three options 
detailed below **apply only for remote Segment Stores**.
 
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
index 212c7a0435..bb7f601b7d 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
@@ -19,13 +19,12 @@ package org.apache.jackrabbit.oak.run;
 
 import java.io.File;
 
-import org.apache.jackrabbit.guava.common.base.StandardSystemProperty;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import org.apache.jackrabbit.guava.common.base.StandardSystemProperty;
 import org.apache.jackrabbit.oak.run.commons.Command;
 import org.apache.jackrabbit.oak.segment.azure.tool.AzureCompact;
-import org.apache.jackrabbit.oak.segment.azure.tool.AzureCompact.Builder;
 import 
org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.aws.tool.AwsCompact;
 import org.apache.jackrabbit.oak.segment.tool.Compact;
@@ -56,11 +55,16 @@ class CompactCommand implements Command {
                 .withOptionalArg()
                 .ofType(Boolean.class);
         OptionSpec<String> compactor = parser.accepts("compactor",
-                "Allow the user to control compactor type to be used. Valid 
choices are \"classic\" and \"diff\". " +
-                        "While the former is slower, it might be more stable, 
due to lack of optimisations employed " +
-                        "by the \"diff\" compactor which compacts the 
checkpoints on top of each other. If not " +
-                        "specified, \"diff\" compactor is used.")
+                "Allow the user to control compactor type to be used. Valid 
choices are \"classic\", \"diff\", \"parallel\". " +
+                        "While \"classic\" is slower, it might be more stable, 
due to lack of optimisations employed " +
+                        "by the \"diff\" compactor which compacts the 
checkpoints on top of each other and \"parallel\" compactor, which splits " +
+                        "the repository into smaller parts and compacts them 
concurrently. If not specified, \"parallel\" compactor is used.")
                 .withRequiredArg().ofType(String.class);
+        OptionSpec<Integer> nThreads = parser.accepts("threads", "Specify the 
number of threads used" +
+                "for compaction. This is only applicable to the \"parallel\" 
compactor. Defaults to the number of available processors.")
+                .withRequiredArg()
+                .ofType(Integer.class)
+                .defaultsTo(-1);
         OptionSpec<String> targetPath = parser.accepts("target-path", 
"Path/URI to TAR/remote segment store where " +
                 "resulting archives will be written")
                 .withRequiredArg()
@@ -72,8 +76,9 @@ class CompactCommand implements Command {
         OptionSpec<Integer> persistentCacheSizeGb = 
parser.accepts("persistent-cache-size-gb", "Size in GB (defaults to 50 GB) for "
                 + "the persistent disk cache")
                 .withRequiredArg()
-                .defaultsTo("50")
-                .ofType(Integer.class);
+                .ofType(Integer.class)
+                .defaultsTo(50);
+
 
         OptionSet options = parser.parse(args);
 
@@ -85,7 +90,7 @@ class CompactCommand implements Command {
             System.exit(-1);
         }
 
-        int code = 0;
+        int code;
 
         if (path.startsWith("az:")) {
             if (targetPath.value(options) == null) {
@@ -100,45 +105,48 @@ class CompactCommand implements Command {
                 System.exit(-1);
             }
 
-            Builder azureBuilder = AzureCompact.builder()
+            AzureCompact.Builder azureBuilder = AzureCompact.builder()
                     .withPath(path)
                     .withTargetPath(targetPath.value(options))
                     
.withPersistentCachePath(persistentCachePath.value(options))
                     
.withPersistentCacheSizeGb(persistentCacheSizeGb.value(options))
                     .withForce(isTrue(forceArg.value(options)))
-                    .withGCLogInterval(Long.getLong("compaction-progress-log", 
150000));
+                    .withGCLogInterval(Long.getLong("compaction-progress-log", 
150000))
+                    .withConcurrency(nThreads.value(options));
 
             if (options.has(compactor)) {
                 
azureBuilder.withCompactorType(CompactorType.fromDescription(compactor.value(options)));
             }
 
-            code = azureBuilder
-                    .build()
-                    .run();
+            code = azureBuilder.build().run();
         } else if (path.startsWith("aws:")) {
-            code = AwsCompact.builder()
+            AwsCompact.Builder awsBuilder = AwsCompact.builder()
                     .withPath(path)
                     .withForce(isTrue(forceArg.value(options)))
                     .withSegmentCacheSize(Integer.getInteger("cache", 256))
                     .withGCLogInterval(Long.getLong("compaction-progress-log", 
150000))
-                    .build()
-                    .run();
+                    .withConcurrency(nThreads.value(options));
+
+            if (options.has(compactor)) {
+                
awsBuilder.withCompactorType(CompactorType.fromDescription(compactor.value(options)));
+            }
+
+            code = awsBuilder.build().run();
         } else {
-            org.apache.jackrabbit.oak.segment.tool.Compact.Builder tarBuilder 
= Compact.builder()
+            Compact.Builder tarBuilder = Compact.builder()
                     .withPath(new File(path))
                     .withForce(isTrue(forceArg.value(options)))
                     .withMmap(mmapArg.value(options))
                     .withOs(StandardSystemProperty.OS_NAME.value())
                     .withSegmentCacheSize(Integer.getInteger("cache", 256))
-                    .withGCLogInterval(Long.getLong("compaction-progress-log", 
150000));
+                    .withGCLogInterval(Long.getLong("compaction-progress-log", 
150000))
+                    .withConcurrency(nThreads.value(options));
 
             if (options.has(compactor)) {
                 
tarBuilder.withCompactorType(CompactorType.fromDescription(compactor.value(options)));
             }
 
-            code = tarBuilder
-                    .build()
-                    .run();
+            code = tarBuilder.build().run();
         }
 
         System.exit(code);
diff --git 
a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
 
b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
index 0b7514f5c6..2e6e0cb051 100644
--- 
a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
+++ 
b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
@@ -34,6 +34,7 @@ import org.apache.jackrabbit.guava.common.io.Files;
 
 import org.apache.jackrabbit.oak.segment.SegmentCache;
 import 
org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import 
org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.JournalReader;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
@@ -72,6 +73,10 @@ public class AwsCompact {
 
         private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
 
+        private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
+
+        private int concurrency = -1;
+
         private Builder() {
             // Prevent external instantiation.
         }
@@ -128,6 +133,27 @@ public class AwsCompact {
             return this;
         }
 
+        /**
+         * The compactor type to be used by compaction. If not specified it 
defaults to
+         * "parallel" compactor
+         * @param compactorType the compactor type
+         * @return this builder
+         */
+        public Builder withCompactorType(CompactorType compactorType) {
+            this.compactorType = compactorType;
+            return this;
+        }
+
+        /**
+         * The number of threads to be used for compaction. This only applies 
to the "parallel" compactor
+         * @param concurrency the number of threads
+         * @return this builder
+         */
+        public Builder withConcurrency(int concurrency) {
+            this.concurrency = concurrency;
+            return this;
+        }
+
         /**
          * Create an executable version of the {@link Compact} command.
          *
@@ -147,11 +173,17 @@ public class AwsCompact {
 
     private final long gcLogInterval;
 
+    private final CompactorType compactorType;
+
+    private final int concurrency;
+
     private AwsCompact(Builder builder) {
         this.path = builder.path;
         this.segmentCacheSize = builder.segmentCacheSize;
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
+        this.compactorType = builder.compactorType;
+        this.concurrency = builder.concurrency;
     }
 
     public int run() throws IOException {
@@ -173,7 +205,7 @@ public class AwsCompact {
         System.out.printf("    -> compacting\n");
 
         try (FileStore store = newFileStore(persistence, 
Files.createTempDir(), strictVersionCheck, segmentCacheSize,
-                gcLogInterval)) {
+                gcLogInterval, compactorType, concurrency)) {
             if (!store.compactFull()) {
                 System.out.printf("Compaction cancelled after %s.\n", 
printableStopwatch(watch));
                 return 1;
diff --git 
a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
 
b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
index f198ae69d9..e76d18bf86 100644
--- 
a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
+++ 
b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
@@ -34,6 +34,7 @@ import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.segment.aws.AwsContext;
 import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
 import org.apache.jackrabbit.oak.segment.aws.Configuration;
+import 
org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
 import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
@@ -69,14 +70,26 @@ public class AwsToolUtils {
     }
 
     public static FileStore newFileStore(SegmentNodeStorePersistence 
persistence, File directory,
-            boolean strictVersionCheck, int segmentCacheSize, long 
gcLogInterval)
-            throws IOException, InvalidFileStoreVersionException, 
URISyntaxException {
+                                         boolean strictVersionCheck, int 
segmentCacheSize, long gcLogInterval)
+            throws IOException, InvalidFileStoreVersionException {
+        return newFileStore(persistence, directory, strictVersionCheck, 
segmentCacheSize,
+                gcLogInterval, CompactorType.PARALLEL_COMPACTOR, 1);
+    }
+
+    public static FileStore newFileStore(SegmentNodeStorePersistence 
persistence, File directory,
+                                         boolean strictVersionCheck, int 
segmentCacheSize, long gcLogInterval,
+                                         CompactorType compactorType, int 
gcConcurrency)
+            throws IOException, InvalidFileStoreVersionException {
         FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory)
                 .withCustomPersistence(persistence)
                 .withMemoryMapping(false)
                 .withStrictVersionCheck(strictVersionCheck)
                 .withSegmentCacheSize(segmentCacheSize)
-                
.withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval));
+                .withGCOptions(defaultGCOptions()
+                        .setOffline()
+                        .setGCLogInterval(gcLogInterval)
+                        .setCompactorType(compactorType)
+                        .setConcurrency(gcConcurrency));
 
         return builder.build();
     }
diff --git 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
index 7ca8188eb1..087cd355d9 100644
--- 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
+++ 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
@@ -79,7 +79,9 @@ public class AzureCompact {
 
         private int segmentCacheSize = 2048;
 
-        private CompactorType compactorType = 
CompactorType.CHECKPOINT_COMPACTOR;
+        private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
+
+        private int concurrency = -1;
 
         private String persistentCachePath;
 
@@ -159,7 +161,7 @@ public class AzureCompact {
 
         /**
          * The compactor type to be used by compaction. If not specified it 
defaults to
-         * "diff" compactor
+         * "parallel" compactor
          * @param compactorType the compactor type
          * @return this builder
          */
@@ -168,6 +170,16 @@ public class AzureCompact {
             return this;
         }
 
+        /**
+         * The number of threads to be used for compaction. This only applies 
to the "parallel" compactor
+         * @param concurrency the number of threads
+         * @return this builder
+         */
+        public Builder withConcurrency(int concurrency) {
+            this.concurrency = concurrency;
+            return this;
+        }
+
         /**
          * The path where segments in the persistent cache will be stored.
          *
@@ -215,6 +227,8 @@ public class AzureCompact {
 
     private final CompactorType compactorType;
 
+    private final int concurrency;
+
     private String persistentCachePath;
 
     private Integer persistentCacheSizeGb;
@@ -226,6 +240,7 @@ public class AzureCompact {
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
         this.compactorType = builder.compactorType;
+        this.concurrency = builder.concurrency;
         this.persistentCachePath = builder.persistentCachePath;
         this.persistentCacheSizeGb = builder.persistentCacheSizeGb;
     }
@@ -254,7 +269,7 @@ public class AzureCompact {
         System.out.printf("    -> compacting\n");
 
         try (FileStore store = newFileStore(splitPersistence, 
Files.createTempDir(), strictVersionCheck, segmentCacheSize,
-                gcLogInterval, compactorType)) {
+                gcLogInterval, compactorType, concurrency)) {
             if (!store.compactFull()) {
                 System.out.printf("Compaction cancelled after %s.\n", 
printableStopwatch(watch));
                 return 1;
diff --git 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
index cfc3d4c9d4..325d648fef 100644
--- 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
+++ 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
@@ -86,14 +86,26 @@ public class ToolUtils {
     }
 
     public static FileStore newFileStore(SegmentNodeStorePersistence 
persistence, File directory,
-            boolean strictVersionCheck, int segmentCacheSize, long 
gcLogInterval, CompactorType compactorType)
-            throws IOException, InvalidFileStoreVersionException, 
URISyntaxException, StorageException {
-        FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory)
-                
.withCustomPersistence(persistence).withMemoryMapping(false).withStrictVersionCheck(strictVersionCheck)
-                .withSegmentCacheSize(segmentCacheSize)
-                
.withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval).setCompactorType(compactorType));
+                                         boolean strictVersionCheck, int 
segmentCacheSize, long gcLogInterval, CompactorType compactorType)
+            throws IOException, InvalidFileStoreVersionException {
+        return newFileStore(persistence, directory, strictVersionCheck,
+                segmentCacheSize, gcLogInterval, compactorType, 1);
+    }
 
-        return builder.build();
+    public static FileStore newFileStore(SegmentNodeStorePersistence 
persistence, File directory,
+            boolean strictVersionCheck, int segmentCacheSize, long 
gcLogInterval, CompactorType compactorType, int gcConcurrency)
+            throws IOException, InvalidFileStoreVersionException {
+        return FileStoreBuilder.fileStoreBuilder(directory)
+                .withCustomPersistence(persistence)
+                .withMemoryMapping(false)
+                .withStrictVersionCheck(strictVersionCheck)
+                .withSegmentCacheSize(segmentCacheSize)
+                .withGCOptions(defaultGCOptions()
+                        .setOffline()
+                        .setGCLogInterval(gcLogInterval)
+                        .setCompactorType(compactorType)
+                        .setConcurrency(gcConcurrency))
+                .build();
     }
 
     public static SegmentNodeStorePersistence 
newSegmentNodeStorePersistence(SegmentStoreType storeType,
diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml
index 006bc57b4c..39773eb1c1 100644
--- a/oak-segment-tar/pom.xml
+++ b/oak-segment-tar/pom.xml
@@ -415,8 +415,6 @@
             <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-core</artifactId>
             <version>${project.version}</version>
-            <classifier>tests</classifier>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
index 51677e8623..c84aa24eaa 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
@@ -18,6 +18,8 @@
 
 package org.apache.jackrabbit.oak.segment;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
 import static org.apache.jackrabbit.guava.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.guava.common.collect.Maps.newLinkedHashMap;
@@ -25,6 +27,7 @@ import static 
org.apache.jackrabbit.oak.commons.PathUtils.elements;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static 
org.apache.jackrabbit.oak.segment.ClassicCompactor.getStableIdBytes;
 
 import java.io.IOException;
 import java.util.Date;
@@ -57,13 +60,13 @@ import org.jetbrains.annotations.Nullable;
  */
 public class CheckpointCompactor implements Compactor {
     @NotNull
-    private final GCMonitor gcListener;
+    protected final GCMonitor gcListener;
 
     @NotNull
     private final Map<NodeState, NodeState> cpCache = newHashMap();
 
     @NotNull
-    private final ClassicCompactor compactor;
+    protected final ClassicCompactor compactor;
 
     @NotNull
     private final NodeWriter nodeWriter;
@@ -139,14 +142,7 @@ public class CheckpointCompactor implements Compactor {
             childBuilder.setChildNode(getName(path), state);
         }
 
-        return nodeWriter.writeNode(builder.getNodeState(), 
getStableIdBytes(uncompacted));
-    }
-
-    @Nullable
-    private static Buffer getStableIdBytes(@NotNull NodeState node) {
-        return node instanceof SegmentNodeState
-            ? ((SegmentNodeState) node).getStableIdBytes()
-            : null;
+        return nodeWriter.writeNode(builder.getNodeState(), 
requireNonNull(getStableIdBytes(uncompacted)));
     }
 
     @NotNull
@@ -232,6 +228,19 @@ public class CheckpointCompactor implements Compactor {
             }
         }
 
+    /**
+     * Delegate compaction to another, usually simpler, implementation.
+     */
+    @Nullable
+    protected SegmentNodeState compactWithDelegate(
+            @NotNull NodeState before,
+            @NotNull NodeState after,
+            @NotNull NodeState onto,
+            Canceller canceller
+    ) throws IOException {
+        return compactor.compact(before, after, onto, canceller);
+    }
+
     /**
      * Compact {@code after} against {@code before} on top of {@code onto} 
unless
      * {@code after} has been compacted before and is found in the cache. In 
this
@@ -248,7 +257,7 @@ public class CheckpointCompactor implements Compactor {
         gcListener.info("compacting {}.", path);
         NodeState compacted = cpCache.get(after);
         if (compacted == null) {
-            compacted = compactor.compact(before, after, onto, canceller);
+            compacted = compactWithDelegate(before, after, onto, canceller);
             if (compacted == null) {
                 return null;
             } else {
@@ -260,5 +269,4 @@ public class CheckpointCompactor implements Compactor {
             return new Result(compacted, before, onto);
         }
     }
-
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
index 74c8fb59da..bb599fccfb 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
@@ -124,7 +124,7 @@ public class ClassicCompactor implements Compactor {
     }
 
     @Nullable
-    private static Buffer getStableIdBytes(NodeState state) {
+    protected static Buffer getStableIdBytes(@NotNull NodeState state) {
         if (state instanceof SegmentNodeState) {
             return ((SegmentNodeState) state).getStableIdBytes();
         } else {
@@ -132,6 +132,12 @@ public class ClassicCompactor implements Compactor {
         }
     }
 
+    protected SegmentNodeState writeNodeState(NodeState nodeState, Buffer 
stableIdBytes) throws IOException {
+        RecordId nodeId = writer.writeNode(nodeState, stableIdBytes);
+        compactionMonitor.onNode();
+        return new SegmentNodeState(reader, writer, blobStore, nodeId);
+    }
+
     private class CompactDiff implements NodeStateDiff {
         @NotNull
         private MemoryNodeBuilder builder;
@@ -162,15 +168,14 @@ public class ClassicCompactor implements Compactor {
 
         @Nullable
         SegmentNodeState diff(@NotNull NodeState before, @NotNull NodeState 
after) throws IOException {
-            boolean success = after.compareAgainstBaseState(before, new 
CancelableDiff(this, () -> canceller.check().isCancelled()));
+            boolean success = after.compareAgainstBaseState(before,
+                    new CancelableDiff(this, () -> 
canceller.check().isCancelled()));
             if (exception != null) {
                 throw new IOException(exception);
             } else if (success) {
                 NodeState nodeState = builder.getNodeState();
                 checkState(modCount == 0 || !(nodeState instanceof 
SegmentNodeState));
-                RecordId nodeId = writer.writeNode(nodeState, 
getStableIdBytes(after));
-                compactionMonitor.onNode();
-                return new SegmentNodeState(reader, writer, blobStore, nodeId);
+                return writeNodeState(nodeState, getStableIdBytes(after));
             } else {
                 return null;
             }
@@ -242,7 +247,7 @@ public class ClassicCompactor implements Compactor {
     }
 
     @NotNull
-    private  PropertyState compact(@NotNull PropertyState property) {
+    protected PropertyState compact(@NotNull PropertyState property) {
         compactionMonitor.onProperty();
         String name = property.getName();
         Type<?> type = property.getType();
@@ -260,5 +265,4 @@ public class ClassicCompactor implements Compactor {
             return createProperty(name, property.getValue(type), type);
         }
     }
-
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
new file mode 100644
index 0000000000..613266706e
--- /dev/null
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
+import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
+import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
+import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static 
org.apache.jackrabbit.oak.segment.ClassicCompactor.getStableIdBytes;
+
+/**
+ * This compactor implementation leverages the tree structure of the 
repository for concurrent compaction.
+ * It explores the tree breadth-first until the target node count is reached. 
Every node at this depth will be
+ * an entry point for asynchronous compaction. After the exploration phase, 
the main thread will collect
+ * these compaction results and write their parents' node state to disk.
+ */
+public class ParallelCompactor extends CheckpointCompactor {
+    /**
+     * Expand repository tree until there are this many nodes for each worker 
to compact. Tradeoff
+     * between low efficiency of many small tasks and high risk of at least 
one of the subtrees being
+     * significantly larger than totalSize / numWorkers (unequal work 
distribution).
+     */
+    private static final int MIN_NODES_PER_WORKER = 1000;
+
+    /**
+     * Stop expansion if tree size grows beyond this many nodes per worker at 
the latest.
+     */
+    private static final int MAX_NODES_PER_WORKER = 10_000;
+
+    private final int numWorkers;
+
+    private final long totalSizeEstimate;
+
+    /**
+     * Manages workers for asynchronous compaction.
+     */
+    @Nullable
+    private ExecutorService executorService;
+
+    /**
+     * Create a new instance based on the passed arguments.
+     * @param gcListener listener receiving notifications about the garbage 
collection process
+     * @param reader     segment reader used to read from the segments
+     * @param writer     segment writer used to serialise to segments
+     * @param blobStore  the blob store or {@code null} if none
+     * @param compactionMonitor   notification call back for each compacted 
nodes, properties, and binaries
+     * @param nThreads   number of threads to use for parallel compaction,
+     *                   negative numbers are interpreted relative to the 
number of available processors
+     */
+    public ParallelCompactor(
+            @NotNull GCMonitor gcListener,
+            @NotNull SegmentReader reader,
+            @NotNull SegmentWriter writer,
+            @Nullable BlobStore blobStore,
+            @NotNull GCNodeWriteMonitor compactionMonitor,
+            int nThreads) {
+        super(gcListener, reader, writer, blobStore, compactionMonitor);
+
+        int availableProcessors = Runtime.getRuntime().availableProcessors();
+        if (nThreads < 0) {
+            nThreads += availableProcessors + 1;
+        }
+        numWorkers = Math.max(0, nThreads - 1);
+        totalSizeEstimate = compactionMonitor.getEstimatedTotal();
+    }
+
+    /**
+     * Calculates the minimum number of entry points for asynchronous 
compaction.
+     */
+    private int getMinNodeCount() {
+        return numWorkers * MIN_NODES_PER_WORKER;
+    }
+
+    private int getMaxNodeCount() {
+        return numWorkers * MAX_NODES_PER_WORKER;
+    }
+
+    /**
+     * Represents structure of repository changes. Tree is built by 
exploration process and subsequently
+     * used to collect and merge asynchronous compaction results.
+     */
+    private class CompactionTree implements NodeStateDiff {
+        @NotNull
+        private final NodeState before;
+
+        @NotNull
+        private final NodeState after;
+
+        @NotNull
+        private final NodeState onto;
+
+        @NotNull
+        private final HashMap<String, CompactionTree> modifiedChildren = new 
HashMap<>();
+
+        @NotNull
+        private final List<Property> modifiedProperties = new ArrayList<>();
+
+        @NotNull
+        private final List<String> removedChildNames = new ArrayList<>();
+
+        @NotNull
+        private final List<String> removedPropertyNames = new ArrayList<>();
+
+        /**
+         * Stores result of asynchronous compaction.
+         */
+        @Nullable
+        private Future<SegmentNodeState> compactionFuture;
+
+        CompactionTree(@NotNull NodeState before, @NotNull NodeState after, 
@NotNull NodeState onto) {
+            this.before = checkNotNull(before);
+            this.after = checkNotNull(after);
+            this.onto = checkNotNull(onto);
+        }
+
+        private class Property {
+            @NotNull
+            private final PropertyState state;
+
+            Property(@NotNull PropertyState state) {
+                this.state = state;
+            }
+
+            @NotNull
+            PropertyState compact() {
+                return compactor.compact(state);
+            }
+        }
+
+        boolean compareStates(Canceller canceller) {
+            return after.compareAgainstBaseState(before,
+                    new CancelableDiff(this, () -> 
canceller.check().isCancelled()));
+        }
+
+        long getEstimatedSize() {
+            return ApproximateCounter.getCountSync(after);
+        }
+
+        @Override
+        public boolean propertyAdded(PropertyState after) {
+            modifiedProperties.add(new Property(after));
+            return true;
+        }
+
+        @Override
+        public boolean propertyChanged(PropertyState before, PropertyState 
after) {
+            modifiedProperties.add(new Property(after));
+            return true;
+        }
+
+        @Override
+        public boolean propertyDeleted(PropertyState before) {
+            removedPropertyNames.add(before.getName());
+            return true;
+        }
+
+        @Override
+        public boolean childNodeAdded(String name, NodeState after) {
+            CompactionTree child = new CompactionTree(EMPTY_NODE, after, 
EMPTY_NODE);
+            modifiedChildren.put(name, child);
+            return true;
+        }
+
+        @Override
+        public boolean childNodeChanged(String name, NodeState before, 
NodeState after) {
+            CompactionTree child = new CompactionTree(before, after, 
onto.getChildNode(name));
+            modifiedChildren.put(name, child);
+            return true;
+        }
+
+        @Override
+        public boolean childNodeDeleted(String name, NodeState before) {
+            removedChildNames.add(name);
+            return true;
+        }
+
+        /**
+         * Start asynchronous compaction.
+         */
+        boolean compactAsync(Canceller canceller) {
+            if (compactionFuture != null) {
+                return false;
+            }
+            checkNotNull(executorService);
+            compactionFuture = executorService.submit(() -> 
compactor.compact(before, after, onto, canceller));
+            return true;
+        }
+
+        /**
+         * Start synchronous compaction on tree or collect result of 
asynchronous compaction if it has been started.
+         */
+        @Nullable
+        SegmentNodeState compact() throws IOException {
+            if (compactionFuture != null) {
+                try {
+                    return compactionFuture.get();
+                } catch (InterruptedException e) {
+                    return null;
+                } catch (ExecutionException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            MemoryNodeBuilder builder = new MemoryNodeBuilder(onto);
+
+            for (Map.Entry<String, CompactionTree> entry : 
modifiedChildren.entrySet()) {
+                SegmentNodeState compactedState = entry.getValue().compact();
+                if (compactedState == null) {
+                    return null;
+                }
+                builder.setChildNode(entry.getKey(), compactedState);
+            }
+            for (String childName : removedChildNames) {
+                builder.getChildNode(childName).remove();
+            }
+            for (Property property : modifiedProperties) {
+                builder.setProperty(property.compact());
+            }
+            for (String propertyName : removedPropertyNames) {
+                builder.removeProperty(propertyName);
+            }
+            return compactor.writeNodeState(builder.getNodeState(), 
getStableIdBytes(after));
+        }
+    }
+
+    /**
+     * Implementation of {@link NodeStateDiff} to build {@link CompactionTree} 
and start asynchronous compaction on
+     * suitable entry points. Performs what is referred to as the exploration 
phase in other comments.
+     */
+    private class CompactionHandler {
+        @NotNull
+        private final NodeState base;
+
+        @NotNull
+        private final Canceller canceller;
+
+        CompactionHandler(@NotNull NodeState base, @NotNull Canceller 
canceller) {
+            this.base = base;
+            this.canceller = canceller;
+        }
+
+        @Nullable
+        SegmentNodeState diff(@NotNull NodeState before, @NotNull NodeState 
after) throws IOException {
+            checkNotNull(executorService);
+            checkState(!executorService.isShutdown());
+
+            gcListener.info("compacting with {} threads.", numWorkers + 1);
+            gcListener.info("exploring content tree to find subtrees for 
parallel compaction.");
+            gcListener.info("target node count for expansion is {}, based on 
{} available workers.",
+                    getMinNodeCount(), numWorkers);
+
+            CompactionTree compactionTree = new CompactionTree(before, after, 
base);
+            if (!compactionTree.compareStates(canceller)) {
+                return null;
+            }
+
+            List<CompactionTree> topLevel = new ArrayList<>();
+            for (Map.Entry<String, CompactionTree> childEntry : 
compactionTree.modifiedChildren.entrySet()) {
+                switch (childEntry.getKey()) {
+                    // these tend to be the largest directories, others will 
not be split up
+                    case "content":
+                    case "oak:index":
+                    case "jcr:system":
+                        topLevel.add(childEntry.getValue());
+                        break;
+                    default:
+                        
checkState(childEntry.getValue().compactAsync(canceller));
+                        break;
+                }
+            }
+
+            if (diff(1, topLevel)) {
+                SegmentNodeState compacted = compactionTree.compact();
+                if (compacted != null) {
+                    return compacted;
+                }
+            }
+
+            try {
+                // compaction failed, terminate remaining tasks
+                executorService.shutdown();
+                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                    executorService.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executorService.shutdownNow();
+            }
+
+            return null;
+        }
+
+        private boolean diff(int depth, List<CompactionTree> nodes) {
+            int targetCount = getMinNodeCount();
+            gcListener.info("Found {} nodes at depth {}, target is {}.", 
nodes.size(), depth, targetCount);
+
+            if (nodes.size() >= targetCount) {
+                nodes.forEach(node -> node.compactAsync(canceller));
+                return true;
+            } else if (nodes.isEmpty()) {
+                gcListener.info("Amount of changes too small, tree will not be 
split.");
+                return true;
+            }
+
+            List<CompactionTree> nextDepth = new ArrayList<>();
+            for (CompactionTree node : nodes) {
+                long estimatedSize = node.getEstimatedSize();
+                if (estimatedSize != -1 && estimatedSize <= (totalSizeEstimate 
/ numWorkers)) {
+                    checkState(node.compactAsync(canceller));
+                } else if (nextDepth.size() < getMaxNodeCount()) {
+                    if (!node.compareStates(canceller)) {
+                        return false;
+                    }
+                    nextDepth.addAll(node.modifiedChildren.values());
+                } else {
+                    nextDepth.add(node);
+                }
+            }
+
+            return diff(depth + 1, nextDepth);
+        }
+    }
+
+    @Nullable
+    @Override
+    protected SegmentNodeState compactWithDelegate(
+            @NotNull NodeState before,
+            @NotNull NodeState after,
+            @NotNull NodeState onto,
+            Canceller canceller
+    ) throws IOException {
+        if (numWorkers <= 0) {
+            gcListener.info("using sequential compaction.");
+            return compactor.compact(before, after, onto, canceller);
+        } else if (executorService == null || executorService.isShutdown()) {
+            executorService = Executors.newFixedThreadPool(numWorkers);
+        }
+        return new CompactionHandler(onto, canceller).diff(before, after);
+    }
+}
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
index 3b8817ad89..f6effba94f 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
@@ -21,14 +21,17 @@ package org.apache.jackrabbit.oak.segment;
 
 import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.jackrabbit.guava.common.base.Supplier;
+import org.apache.jackrabbit.guava.common.cache.CacheBuilder;
 import org.apache.jackrabbit.guava.common.cache.CacheStats;
+import org.apache.jackrabbit.guava.common.cache.RemovalListener;
 import org.apache.jackrabbit.guava.common.cache.Weigher;
-import org.jetbrains.annotations.NotNull;
 
-import org.apache.jackrabbit.guava.common.base.Supplier;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Partial mapping of keys of type {@code K} to values of type {@link 
RecordId}. This is
@@ -37,11 +40,6 @@ import org.apache.jackrabbit.guava.common.base.Supplier;
  * @param <K>
  */
 public abstract class RecordCache<K> implements Cache<K, RecordId> {
-    private long hitCount;
-    private long missCount;
-    private long loadCount;
-    private long evictionCount;
-
     /**
      * @return number of mappings
      */
@@ -58,9 +56,7 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
      * @return  access statistics for this cache
      */
     @NotNull
-    public CacheStats getStats() {
-        return new CacheStats(hitCount, missCount, loadCount, 0, 0, 
evictionCount);
-    }
+    public abstract CacheStats getStats();
 
     /**
      * Factory method for creating {@code RecordCache} instances. The returned
@@ -75,7 +71,7 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
         if (size <= 0) {
             return new Empty<>();
         } else {
-            return new Default<>(size, CacheWeights.<T, RecordId> 
noopWeigher());
+            return new Default<>(size, CacheWeights.noopWeigher());
         }
     }
 
@@ -106,26 +102,28 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
         if (size <= 0) {
             return Empty.emptyFactory();
         } else {
-            return Default.defaultFactory(size, CacheWeights.<T, RecordId> 
noopWeigher());
+            return Default.defaultFactory(size, CacheWeights.noopWeigher());
         }
     }
 
     private static class Empty<T> extends RecordCache<T> {
+        private final AtomicLong missCount = new AtomicLong();
+
         static final <T> Supplier<RecordCache<T>> emptyFactory() {
-            return  new Supplier<RecordCache<T>>() {
-                @Override
-                public RecordCache<T> get() {
-                    return new Empty<>();
-                }
-            };
+            return Empty::new;
+        }
+
+        @Override
+        public @NotNull CacheStats getStats() {
+            return new CacheStats(0, missCount.get(), 0, 0, 0, 0);
         }
 
         @Override
-        public synchronized void put(@NotNull T key, @NotNull RecordId value) 
{ }
+        public void put(@NotNull T key, @NotNull RecordId value) { }
 
         @Override
-        public synchronized RecordId get(@NotNull T key) {
-            super.missCount++;
+        public RecordId get(@NotNull T key) {
+            missCount.incrementAndGet();
             return null;
         }
 
@@ -141,66 +139,64 @@ public abstract class RecordCache<K> implements Cache<K, 
RecordId> {
     }
 
     private static class Default<K> extends RecordCache<K> {
-
         @NotNull
-        private final Map<K, RecordId> records;
+        private final org.apache.jackrabbit.guava.common.cache.Cache<K, 
RecordId> cache;
 
         @NotNull
         private final Weigher<K, RecordId> weigher;
 
-        private long weight = 0;
+        @NotNull
+        private final AtomicLong weight = new AtomicLong();
+
+        @NotNull
+        private final AtomicLong loadCount = new AtomicLong();
 
-        static final <K> Supplier<RecordCache<K>> defaultFactory(final int 
size, @NotNull final Weigher<K, RecordId> weigher) {
-            return new Supplier<RecordCache<K>>() {
-                @Override
-                public RecordCache<K> get() {
-                    return new Default<>(size, checkNotNull(weigher));
-                }
-            };
+        @Override
+        public @NotNull CacheStats getStats() {
+            CacheStats internalStats = cache.stats();
+            // any addition to the cache counts as load by our definition
+            return new CacheStats(internalStats.hitCount(), 
internalStats.missCount(),
+                    loadCount.get(), 0, 0,  internalStats.evictionCount());
+        }
+
+        static <K> Supplier<RecordCache<K>> defaultFactory(final int size, 
@NotNull final Weigher<K, RecordId> weigher) {
+            return () -> new Default<>(size, checkNotNull(weigher));
         }
 
         Default(final int size, @NotNull final Weigher<K, RecordId> weigher) {
-            this.weigher = checkNotNull(weigher);
-            records = new LinkedHashMap<K, RecordId>(size * 4 / 3, 0.75f, 
true) {
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<K, RecordId> 
eldest) {
-                    boolean remove = super.size() > size;
-                    if (remove) {
-                        Default.super.evictionCount++;
-                        weight -= weigher.weigh(eldest.getKey(),
-                                eldest.getValue());
-                    }
-                    return remove;
-                }
-            };
+            this.cache = CacheBuilder.newBuilder()
+                    .maximumSize(size)
+                    .initialCapacity(size)
+                    .concurrencyLevel(4)
+                    .recordStats()
+                    .removalListener((RemovalListener<K, RecordId>) removal -> 
{
+                        int removedWeight = weigher.weigh(removal.getKey(), 
removal.getValue());
+                        weight.addAndGet(-removedWeight);
+                    })
+                    .build();
+            this.weigher = weigher;
         }
 
         @Override
-        public synchronized void put(@NotNull K key, @NotNull RecordId value) {
-            super.loadCount++;
-            records.put(key, value);
-            weight += weigher.weigh(key, value);
+        public void put(@NotNull K key, @NotNull RecordId value) {
+            cache.put(key, value);
+            loadCount.incrementAndGet();
+            weight.addAndGet(weigher.weigh(key, value));
         }
 
         @Override
-        public synchronized RecordId get(@NotNull K key) {
-            RecordId value = records.get(key);
-            if (value == null) {
-                super.missCount++;
-            } else {
-                super.hitCount++;
-            }
-            return value;
+        public RecordId get(@NotNull K key) {
+            return cache.getIfPresent(key);
         }
 
         @Override
-        public synchronized long size() {
-            return records.size();
+        public long size() {
+            return cache.size();
         }
 
         @Override
         public long estimateCurrentWeight() {
-            return weight;
+            return weight.get();
         }
     }
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
index 59c4fadaaf..b37ec2ad01 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
@@ -19,18 +19,19 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
 import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
+import static org.apache.jackrabbit.guava.common.collect.Maps.newConcurrentMap;
 import static org.apache.jackrabbit.guava.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.guava.common.collect.Sets.newHashSet;
 import static java.lang.Thread.currentThread;
+import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
@@ -45,28 +46,18 @@ import org.jetbrains.annotations.NotNull;
  * Instances of this class are thread safe.
  */
 public class SegmentBufferWriterPool implements WriteOperationHandler {
-
-    /**
-     * Monitor protecting the state of this pool. Neither of {@link #writers},
-     * {@link #borrowed} and {@link #disposed} must be modified without owning
-     * this monitor.
-     */
-    private final Monitor poolMonitor = new Monitor(true);
-
-    /**
-     * Pool of current writers that are not in use
-     */
-    private final Map<Object, SegmentBufferWriter> writers = newHashMap();
-
     /**
-     * Writers that are currently in use
+     * Read write lock protecting the state of this pool. Multiple threads can 
access their writers in parallel,
+     * acquiring the read lock. The writer lock is needed for the flush 
operation since it requires none
+     * of the writers to be in use.
      */
-    private final Set<SegmentBufferWriter> borrowed = newHashSet();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
     /**
-     * Retired writers that have not yet been flushed
+     * Pool of writers. Every thread is assigned a unique writer per GC 
generation, therefore only requiring
+     * a concurrent map to synchronize access to them.
      */
-    private final Set<SegmentBufferWriter> disposed = newHashSet();
+    private final ConcurrentMap<Object, SegmentBufferWriter> writers = 
newConcurrentMap();
 
     @NotNull
     private final SegmentIdProvider idProvider;
@@ -87,143 +78,58 @@ public class SegmentBufferWriterPool implements 
WriteOperationHandler {
             @NotNull SegmentReader reader,
             @NotNull String wid,
             @NotNull Supplier<GCGeneration> gcGeneration) {
-        this.idProvider = checkNotNull(idProvider);
-        this.reader = checkNotNull(reader);
-        this.wid = checkNotNull(wid);
-        this.gcGeneration = checkNotNull(gcGeneration);
+        this.idProvider = requireNonNull(idProvider);
+        this.reader = requireNonNull(reader);
+        this.wid = requireNonNull(wid);
+        this.gcGeneration = requireNonNull(gcGeneration);
     }
 
-    @Override
     @NotNull
+    @Override
     public GCGeneration getGCGeneration() {
         return gcGeneration.get();
     }
 
-    @NotNull
-    @Override
-    public RecordId execute(@NotNull GCGeneration gcGeneration,
-                            @NotNull WriteOperation writeOperation)
-    throws IOException {
-        SimpleImmutableEntry<?,?> key = new 
SimpleImmutableEntry<>(currentThread(), gcGeneration);
-        SegmentBufferWriter writer = borrowWriter(key, gcGeneration);
-        try {
-            return writeOperation.execute(writer);
-        } finally {
-            returnWriter(key, writer);
-        }
-    }
-
     @Override
     public void flush(@NotNull SegmentStore store) throws IOException {
-        List<SegmentBufferWriter> toFlush = newArrayList();
-        List<SegmentBufferWriter> toReturn = newArrayList();
-
-        poolMonitor.enter();
+        lock.writeLock().lock();
         try {
-            // Collect all writers that are not currently in use and clear
-            // the list so they won't get re-used anymore.
-            toFlush.addAll(writers.values());
+            for (SegmentBufferWriter writer : writers.values()) {
+                writer.flush(store);
+            }
             writers.clear();
-
-            // Collect all borrowed writers, which we need to wait for.
-            // Clear the list so they will get disposed once returned.
-            toReturn.addAll(borrowed);
-            borrowed.clear();
         } finally {
-            poolMonitor.leave();
-        }
-
-        // Wait for the return of the borrowed writers. This is the
-        // case once all of them appear in the disposed set.
-        if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
-            try {
-                // Collect all disposed writers and clear the list to mark them
-                // as flushed.
-                toFlush.addAll(toReturn);
-                disposed.removeAll(toReturn);
-            } finally {
-                poolMonitor.leave();
-            }
-        }
-
-        // Call flush from outside the pool monitor to avoid potential
-        // deadlocks of that method calling SegmentStore.writeSegment
-        for (SegmentBufferWriter writer : toFlush) {
-            writer.flush(store);
+            lock.writeLock().unlock();
         }
     }
 
-    /**
-     * Create a {@code Guard} that is satisfied if and only if {@link 
#disposed}
-     * contains all items in {@code toReturn}
-     */
     @NotNull
-    private Guard allReturned(final List<SegmentBufferWriter> toReturn) {
-        return new Guard(poolMonitor) {
-
-            @Override
-            public boolean isSatisfied() {
-                return disposed.containsAll(toReturn);
-            }
-
-        };
-    }
-
-    /**
-     * Same as {@code monitor.enterWhen(guard)} but copes with that pesky 
{@code
-     * InterruptedException} by catching it and setting this thread's
-     * interrupted flag.
-     */
-    private static boolean safeEnterWhen(Monitor monitor, Guard guard) {
-        try {
-            monitor.enterWhen(guard);
-            return true;
-        } catch (InterruptedException ignore) {
-            currentThread().interrupt();
-            return false;
-        }
-    }
-
-    /**
-     * Return a writer from the pool by its {@code key}. This method may return
-     * a fresh writer at any time. Callers need to return a writer before
-     * borrowing it again. Failing to do so leads to undefined behaviour.
-     */
-    private SegmentBufferWriter borrowWriter(@NotNull Object key, @NotNull 
GCGeneration gcGeneration) {
-        poolMonitor.enter();
+    @Override
+    public RecordId execute(@NotNull GCGeneration gcGeneration,
+                            @NotNull WriteOperation writeOperation)
+    throws IOException {
+        lock.readLock().lock();
+        SegmentBufferWriter writer = getWriter(currentThread(), gcGeneration);
         try {
-            SegmentBufferWriter writer = writers.remove(key);
-            if (writer == null) {
-                writer = new SegmentBufferWriter(
-                        idProvider,
-                        reader,
-                        getWriterId(wid),
-                        gcGeneration
-                );
-            }
-            borrowed.add(writer);
-            return writer;
+            return writeOperation.execute(writer);
         } finally {
-            poolMonitor.leave();
+            lock.readLock().unlock();
         }
     }
 
-    /**
-     * Return a writer to the pool using the {@code key} that was used to 
borrow
-     * it.
-     */
-    private void returnWriter(Object key, SegmentBufferWriter writer) {
-        poolMonitor.enter();
-        try {
-            if (borrowed.remove(writer)) {
-                checkState(writers.put(key, writer) == null);
-            } else {
-                // Defer flush this writer as it was borrowed while flush() 
was called.
-                disposed.add(writer);
-            }
-        } finally {
-            poolMonitor.leave();
+    private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull 
GCGeneration gcGeneration) {
+        SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, 
gcGeneration);
+        SegmentBufferWriter writer = writers.get(key);
+        if (writer == null) {
+             writer = new SegmentBufferWriter(
+                    idProvider,
+                    reader,
+                    getWriterId(wid),
+                    gcGeneration
+            );
+            writers.put(key, writer);
         }
+        return writer;
     }
 
     private String getWriterId(String wid) {
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
index f0180aa930..d9250c491a 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
@@ -53,9 +53,14 @@ public class SegmentGCOptions {
         CLASSIC_COMPACTOR("classic"),
 
         /**
-         * Checkpoints aware compaction implementation
+         * Checkpoint-aware compaction implementation
          */
-        CHECKPOINT_COMPACTOR("diff");
+        CHECKPOINT_COMPACTOR("diff"),
+
+        /**
+         * Multithreaded compaction implementation
+         */
+        PARALLEL_COMPACTOR("parallel");
 
         private final String description;
 
@@ -69,8 +74,10 @@ public class SegmentGCOptions {
                 return CLASSIC_COMPACTOR;
             case "diff":
                 return CHECKPOINT_COMPACTOR;
+            case "parallel":
+                return PARALLEL_COMPACTOR;
             default:
-                throw new IllegalArgumentException("Unrecongnized compactor 
type " + description);
+                throw new IllegalArgumentException("Unrecognized compactor 
type " + description);
             }
         }
 
@@ -119,6 +126,11 @@ public class SegmentGCOptions {
      */
     public static final int MEMORY_THRESHOLD_DEFAULT = 15;
 
+    /**
+     * Default value for {@link #getConcurrency()}
+     */
+    public static final int DEFAULT_CONCURRENCY = 1;
+
     private boolean paused = PAUSE_DEFAULT;
 
     /**
@@ -149,7 +161,13 @@ public class SegmentGCOptions {
      */
     private long gcLogInterval = -1;
 
-    private CompactorType compactorType = CompactorType.CHECKPOINT_COMPACTOR;
+    /**
+     * Number of threads to use for compaction. Negative numbers are 
interpreted
+     * relative to number of available processors.
+     */
+    private int concurrency = DEFAULT_CONCURRENCY;
+
+    private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
 
     public SegmentGCOptions(boolean paused, int retryCount, int forceTimeout) {
         this.paused = paused;
@@ -275,6 +293,7 @@ public class SegmentGCOptions {
                     "offline=" + offline +
                     ", retainedGenerations=" + retainedGenerations +
                     ", compactorType=" + compactorType +
+                    ", concurrency=" + concurrency +
                     "}";
         } else {
             return getClass().getSimpleName() + "{" +
@@ -384,7 +403,7 @@ public class SegmentGCOptions {
     }
 
     /**
-     * @return the current compactor type (i.e. classic or checkpoint-aware)
+     * @return the current compactor type (i.e. classic, checkpoint-aware or 
parallel)
      */
     public CompactorType getCompactorType() {
         return compactorType;
@@ -393,9 +412,27 @@ public class SegmentGCOptions {
     /**
      * Sets the compactor type to be used for compaction
      * @param compactorType
+     * @return this instance
      */
     public SegmentGCOptions setCompactorType(CompactorType compactorType) {
         this.compactorType = compactorType;
         return this;
     }
+
+    /**
+     * @return the current level of concurrency
+     */
+    public int getConcurrency() {
+        return concurrency;
+    }
+
+    /**
+     * Sets the concurrency level for compaction
+     * @param concurrency number of threads to use
+     * @return this instance                    
+     */
+    public SegmentGCOptions setConcurrency(int concurrency) {
+        this.concurrency = concurrency;
+        return this;
+    }
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
index fc13fa9819..f5f6199b85 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
@@ -29,12 +29,7 @@ import static 
org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
 
 import org.apache.jackrabbit.guava.common.base.Function;
 
-import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
-import org.apache.jackrabbit.oak.segment.ClassicCompactor;
-import org.apache.jackrabbit.oak.segment.Compactor;
-import org.apache.jackrabbit.oak.segment.RecordId;
-import org.apache.jackrabbit.oak.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.*;
 import 
org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType;
 import org.apache.jackrabbit.oak.segment.file.cancel.Cancellation;
@@ -221,6 +216,7 @@ abstract class AbstractCompactionStrategy implements 
CompactionStrategy {
                 writer.flush();
                 context.getFlusher().flush();
                 context.getGCListener().info("compaction succeeded in {}, 
after {} cycles", watch, cycles);
+                context.getCompactionMonitor().finished();
                 return compactionSucceeded(context, nextGeneration, 
compacted.getRecordId());
             } else {
                 context.getGCListener().info("compaction failed after {}, and 
{} cycles", watch, cycles);
@@ -239,15 +235,18 @@ abstract class AbstractCompactionStrategy implements 
CompactionStrategy {
     private Compactor newCompactor(Context context, SegmentWriter writer) {
         CompactorType compactorType = 
context.getGCOptions().getCompactorType();
         switch (compactorType) {
-        case CHECKPOINT_COMPACTOR:
-            return new CheckpointCompactor(context.getGCListener(), 
context.getSegmentReader(), writer,
-                    context.getBlobStore(), context.getCompactionMonitor());
-        case CLASSIC_COMPACTOR:
-            return new ClassicCompactor(context.getSegmentReader(), writer, 
context.getBlobStore(),
-                    context.getCompactionMonitor());
-        default:
-            throw new IllegalArgumentException("Unknown compactor type: " + 
compactorType);
+            case PARALLEL_COMPACTOR:
+                return new ParallelCompactor(context.getGCListener(), 
context.getSegmentReader(), writer,
+                        context.getBlobStore(), context.getCompactionMonitor(),
+                        context.getGCOptions().getConcurrency());
+            case CHECKPOINT_COMPACTOR:
+                return new CheckpointCompactor(context.getGCListener(), 
context.getSegmentReader(), writer,
+                        context.getBlobStore(), 
context.getCompactionMonitor());
+            case CLASSIC_COMPACTOR:
+                return new ClassicCompactor(context.getSegmentReader(), 
writer, context.getBlobStore(),
+                        context.getCompactionMonitor());
+            default:
+                throw new IllegalArgumentException("Unknown compactor type: " 
+ compactorType);
+            }
         }
     }
-
-}
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
index fa9fd05b63..099b3e53e0 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
@@ -197,7 +197,7 @@ public class FileStore extends AbstractFileStore {
                 defaultSegmentWriterBuilder("c")
                     
.with(builder.getCacheManager().withAccessTracking("COMPACT", statsProvider))
                     .withGeneration(generation)
-                    .withoutWriterPool()
+                    .withWriterPool()
                     .build(this)
         );
 
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
index c41f8a6890..565ff3a49e 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
@@ -18,12 +18,16 @@
  */
 package org.apache.jackrabbit.oak.segment.file;
 
+import static  
org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
+
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.jetbrains.annotations.NotNull;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Monitors the compaction cycle and keeps a compacted nodes counter, in order
- * to provide a best effort progress log based on extrapolating the previous
+ * to provide a best-effort progress log based on extrapolating the previous
  * size and node count and current size to deduce current node count.
  */
 public class GCNodeWriteMonitor {
@@ -50,19 +54,19 @@ public class GCNodeWriteMonitor {
     /**
      * Number of compacted nodes
      */
-    private long nodes;
+    private final AtomicLong nodes = new AtomicLong();
 
     /**
      * Number of compacted properties
      */
-    private long properties;
+    private final AtomicLong properties = new AtomicLong();
 
     /**
      * Number of compacted binaries
      */
-    private long binaries;
+    private final AtomicLong binaries = new AtomicLong();
 
-    private boolean running = false;
+    private volatile boolean running = false;
 
     public GCNodeWriteMonitor(long gcProgressLog, @NotNull GCMonitor 
gcMonitor) {
         this.gcProgressLog = gcProgressLog;
@@ -77,7 +81,8 @@ public class GCNodeWriteMonitor {
      * @param currentSize
      *            current repository size
      */
-    public synchronized void init(long prevSize, long prevCompactedNodes, long 
currentSize) {
+    public void init(long prevSize, long prevCompactedNodes, long currentSize) 
{
+        checkState(!running);
         if (prevCompactedNodes > 0) {
             estimated = (long) (((double) currentSize / prevSize) * 
prevCompactedNodes);
             gcMonitor.info(
@@ -87,69 +92,67 @@ public class GCNodeWriteMonitor {
         } else {
             gcMonitor.info("unable to estimate number of nodes for compaction, 
missing gc history.");
         }
-        nodes = 0;
+        nodes.set(0);
+        properties.set(0);
+        binaries.set(0);
         start = System.currentTimeMillis();
         running = true;
     }
 
-    public synchronized void onNode() {
-        nodes++;
-        if (gcProgressLog > 0 && nodes % gcProgressLog == 0) {
+    public void onNode() {
+        long writtenNodes = nodes.incrementAndGet();
+        if (gcProgressLog > 0 && writtenNodes % gcProgressLog == 0) {
             gcMonitor.info("compacted {} nodes, {} properties, {} binaries in 
{} ms. {}",
-                nodes, properties, binaries, System.currentTimeMillis() - 
start, getPercentageDone());
+                    writtenNodes, properties, binaries, 
System.currentTimeMillis() - start, getPercentageDone());
         }
     }
 
-    public synchronized void onProperty() {
-        properties++;
+    public void onProperty() {
+        properties.incrementAndGet();
     }
 
-    public synchronized void onBinary() {
-        binaries++;
+    public void onBinary() {
+        binaries.incrementAndGet();
     }
 
-    public synchronized void finished() {
+    public void finished() {
         running = false;
     }
 
     /**
      * Compacted nodes in current cycle
      */
-    public synchronized long getCompactedNodes() {
-        return nodes;
+    public long getCompactedNodes() {
+        return nodes.get();
     }
 
     /**
      * Estimated nodes to compact in current cycle. Can be {@code -1} if the
      * estimation could not be performed.
      */
-    public synchronized long getEstimatedTotal() {
+    public long getEstimatedTotal() {
         return estimated;
     }
 
     @NotNull
     private String getPercentageDone() {
-        return estimated > 0
-            ? getEstimatedPercentage() + "% complete."
-            : "";
+        int percentage = getEstimatedPercentage();
+        return (percentage >= 0) ? percentage + "% complete." : "";
     }
 
     /**
      * Estimated completion percentage. Can be {@code -1} if the estimation
      * could not be performed.
      */
-    public synchronized int getEstimatedPercentage() {
-        if (estimated > 0) {
-            if (!running) {
-                return 100;
-            } else {
-                return Math.min((int) (100 * ((double) nodes / estimated)), 
99);
-            }
+    public int getEstimatedPercentage() {
+        if (!running) {
+            return 100;
         }
-        return -1;
+        long numNodes = estimated;
+        return (numNodes <= 0) ? -1 : Math.min((int) (100 * ((double) 
nodes.get() / numNodes)), 99);
     }
 
-    public synchronized boolean isCompactionRunning() {
+    public boolean isCompactionRunning() {
         return running;
     }
 
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
index 00038e446b..35b2de936b 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
@@ -20,21 +20,25 @@
 package org.apache.jackrabbit.oak.segment.file;
 
 import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkArgument;
-import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 import static java.lang.Integer.bitCount;
 import static java.lang.Integer.numberOfTrailingZeros;
 import static java.lang.Long.numberOfLeadingZeros;
 import static java.lang.Math.max;
 import static java.util.Arrays.fill;
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.jackrabbit.guava.common.cache.CacheStats;
-import org.apache.jackrabbit.guava.common.cache.Weigher;
 import org.apache.jackrabbit.oak.segment.CacheWeights;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import org.apache.jackrabbit.guava.common.base.Predicate;
 import org.apache.jackrabbit.guava.common.base.Supplier;
+import org.apache.jackrabbit.guava.common.cache.CacheStats;
+import org.apache.jackrabbit.guava.common.cache.Weigher;
 
 /**
  * {@code PriorityCache} implements a partial mapping from keys of type {@code 
K} to values
@@ -59,19 +63,24 @@ import org.apache.jackrabbit.guava.common.base.Supplier;
 public class PriorityCache<K, V> {
     private final int rehash;
     private final Entry<?,?>[] entries;
-    private final int[] costs = new int[256];
-    private final int[] evictions = new int[256];
+    private final AtomicInteger[] costs;
+    private final AtomicInteger[] evictions;
 
-    private long hitCount;
-    private long missCount;
-    private long loadCount;
-    private long loadExceptionCount;
-    private long evictionCount;
-    private long size;
+    private final AtomicLong hitCount = new AtomicLong();
+    private final AtomicLong missCount = new AtomicLong();
+    private final AtomicLong loadCount = new AtomicLong();
+    private final AtomicLong loadExceptionCount = new AtomicLong();
+    private final AtomicLong evictionCount = new AtomicLong();
+    private final AtomicLong size = new AtomicLong();
+
+    private static class Segment extends ReentrantLock {}
+
+    @NotNull
+    private final Segment[] segments;
 
     @NotNull
     private final Weigher<K, V> weigher;
-    private long weight = 0;
+    private final AtomicLong weight = new AtomicLong();
 
     /**
      * Static factory for creating new {@code PriorityCache} instances.
@@ -80,13 +89,8 @@ public class PriorityCache<K, V> {
      */
     public static <K, V> Supplier<PriorityCache<K, V>> factory(final int size, 
@NotNull final Weigher<K, V> weigher) {
         checkArgument(bitCount(size) == 1);
-        checkNotNull(weigher);
-        return new Supplier<PriorityCache<K, V>>() {
-            @Override
-            public PriorityCache<K, V> get() {
-                return new PriorityCache<>(size, weigher);
-            }
-        };
+        requireNonNull(weigher);
+        return () -> new PriorityCache<>(size, weigher);
     }
 
     /**
@@ -96,12 +100,7 @@ public class PriorityCache<K, V> {
      */
     public static <K, V> Supplier<PriorityCache<K, V>> factory(final int size) 
{
         checkArgument(bitCount(size) == 1);
-        return new Supplier<PriorityCache<K, V>>() {
-            @Override
-            public PriorityCache<K, V> get() {
-                return new PriorityCache<>(size);
-            }
-        };
+        return () -> new PriorityCache<>(size);
     }
 
     private static class Entry<K, V> {
@@ -133,7 +132,7 @@ public class PriorityCache<K, V> {
      * @return the next power of two starting from {@code size}.
      */
     public static long nextPowerOfTwo(int size) {
-        return 1L << (64L - numberOfLeadingZeros((long)max(1, size) - 1L));
+        return 1L << (64L - numberOfLeadingZeros(max(1, size) - 1L));
     }
 
     /**
@@ -156,13 +155,40 @@ public class PriorityCache<K, V> {
      * @param weigher   Needed to provide an estimation of the cache weight in 
memory
      */
     public PriorityCache(int size, int rehash, @NotNull Weigher<K, V> weigher) 
{
+        this(size, rehash, weigher, 1024);
+    }
+
+    /**
+     * Create a new instance of the given {@code size}. {@code rehash} 
specifies the number
+     * of rehashes to resolve a clash.
+     * @param size        Size of the cache. Must be a power of {@code 2}.
+     * @param rehash      Number of rehashes. Must be greater or equal to 
{@code 0} and
+     *                    smaller than {@code 32 - 
numberOfTrailingZeros(size)}.
+     * @param weigher     Needed to provide an estimation of the cache weight 
in memory
+     * @param numSegments Number of separately locked segments
+     */
+    public PriorityCache(int size, int rehash, @NotNull Weigher<K, V> weigher, 
int numSegments) {
         checkArgument(bitCount(size) == 1);
         checkArgument(rehash >= 0);
         checkArgument(rehash < 32 - numberOfTrailingZeros(size));
         this.rehash = rehash;
         entries = new Entry<?,?>[size];
         fill(entries, Entry.NULL);
-        this.weigher = checkNotNull(weigher);
+        this.weigher = requireNonNull(weigher);
+
+        numSegments = Math.min(numSegments, size);
+        checkArgument((size % numSegments) == 0);
+        segments = new Segment[numSegments];
+        for (int s = 0; s < numSegments; s++) {
+            segments[s] = new Segment();
+        }
+
+        costs = new AtomicInteger[256];
+        evictions = new AtomicInteger[256];
+        for (int i = 0; i < 256; i++) {
+            costs[i] = new AtomicInteger();
+            evictions[i] = new AtomicInteger();
+        }
     }
 
     /**
@@ -182,11 +208,16 @@ public class PriorityCache<K, V> {
         return (hashCode >> iteration) & (entries.length - 1);
     }
 
+    private Segment getSegment(int index) {
+        int entriesPerSegment = entries.length / segments.length;
+        return segments[index / entriesPerSegment];
+    }
+
     /**
      * @return  the number of mappings in this cache.
      */
     public long size() {
-        return size;
+        return size.get();
     }
 
     /**
@@ -197,19 +228,30 @@ public class PriorityCache<K, V> {
      * @param initialCost    the initial cost associated with this mapping
      * @return  {@code true} if the mapping has been added, {@code false} 
otherwise.
      */
-    public synchronized boolean put(@NotNull K key, @NotNull V value, int 
generation, byte initialCost) {
+    public boolean put(@NotNull K key, @NotNull V value, int generation, byte 
initialCost) {
         int hashCode = key.hashCode();
         byte cheapest = initialCost;
-        int index = -1;
-        boolean eviction = false;
+        int index;
+        boolean eviction;
+
+        Segment lockedSegment = null;
+
         for (int k = 0; k <= rehash; k++) {
             int i = project(hashCode, k);
+            Segment segment = getSegment(i);
+            if (segment != lockedSegment) {
+                if (lockedSegment != null) {
+                    lockedSegment.unlock();
+                }
+                lockedSegment = segment;
+                lockedSegment.lock();
+            }
+
             Entry<?, ?> entry = entries[i];
             if (entry == Entry.NULL) {
                 // Empty slot -> use this index
                 index = i;
                 eviction = false;
-                break;
             } else if (entry.generation <= generation && 
key.equals(entry.key)) {
                 // Key exists and generation is greater or equal -> use this 
index and boost the cost
                 index = i;
@@ -218,42 +260,45 @@ public class PriorityCache<K, V> {
                     initialCost++;
                 }
                 eviction = false;
-                break;
             } else if (entry.generation < generation) {
                 // Old generation -> use this index
                 index = i;
                 eviction = false;
-                break;
             } else if (entry.cost < cheapest) {
                 // Candidate slot, keep on searching for even cheaper slots
                 cheapest = entry.cost;
                 index = i;
                 eviction = true;
+                if (k < rehash) {
+                    continue;
+                }
+            } else {
+                continue;
             }
-        }
 
-        if (index >= 0) {
             Entry<?, ?> old = entries[index];
             Entry<?, ?> newE = new Entry<>(key, value, generation, 
initialCost);
             entries[index] = newE;
-            loadCount++;
-            costs[initialCost - Byte.MIN_VALUE]++;
+            loadCount.incrementAndGet();
+            costs[initialCost - Byte.MIN_VALUE].incrementAndGet();
             if (old != Entry.NULL) {
-                costs[old.cost - Byte.MIN_VALUE]--;
+                costs[old.cost - Byte.MIN_VALUE].decrementAndGet();
                 if (eviction) {
-                    evictions[old.cost - Byte.MIN_VALUE]++;
-                    evictionCount++;
+                    evictions[old.cost - Byte.MIN_VALUE].incrementAndGet();
+                    evictionCount.incrementAndGet();
                 }
-                weight -= weighEntry(old);
+                weight.addAndGet(-weighEntry(old)) ;
             } else {
-                size++;
+                size.incrementAndGet();
             }
-            weight += weighEntry(newE);
+            weight.addAndGet(weighEntry(newE));
+            lockedSegment.unlock();
             return true;
-        } else {
-            loadExceptionCount++;
-            return false;
         }
+
+        requireNonNull(lockedSegment).unlock();
+        loadExceptionCount.incrementAndGet();
+        return false;
     }
 
     /**
@@ -265,22 +310,27 @@ public class PriorityCache<K, V> {
      */
     @SuppressWarnings("unchecked")
     @Nullable
-    public synchronized V get(@NotNull K key, int generation) {
+    public V get(@NotNull K key, int generation) {
         int hashCode = key.hashCode();
         for (int k = 0; k <= rehash; k++) {
             int i = project(hashCode, k);
+            Segment segment = getSegment(i);
+            segment.lock();
             Entry<?, ?> entry = entries[i];
             if (generation == entry.generation && key.equals(entry.key)) {
                 if (entry.cost < Byte.MAX_VALUE) {
-                    costs[entry.cost - Byte.MIN_VALUE]--;
+                    costs[entry.cost - Byte.MIN_VALUE].decrementAndGet();
                     entry.cost++;
-                    costs[entry.cost - Byte.MIN_VALUE]++;
+                    costs[entry.cost - Byte.MIN_VALUE].incrementAndGet();
                 }
-                hitCount++;
-                return (V) entry.value;
+                hitCount.incrementAndGet();
+                V value = (V) entry.value;
+                segment.unlock();
+                return value;
             }
+            segment.unlock();
         }
-        missCount++;
+        missCount.incrementAndGet();
         return null;
     }
 
@@ -289,35 +339,42 @@ public class PriorityCache<K, V> {
      * passed {@code purge} predicate.
      * @param purge
      */
-    public synchronized void purgeGenerations(@NotNull Predicate<Integer> 
purge) {
-        for (int i = 0; i < entries.length; i++) {
-            Entry<?, ?> entry = entries[i];
-            if (entry != Entry.NULL && purge.apply(entry.generation)) {
-                entries[i] = Entry.NULL;
-                size--;
-                weight -= weighEntry(entry);
+    public void purgeGenerations(@NotNull Predicate<Integer> purge) {
+        int numSegments = segments.length;
+        int entriesPerSegment = entries.length / numSegments;
+        for (int s = 0; s < numSegments; s++) {
+            segments[s].lock();
+            for (int i = 0; i < entriesPerSegment; i++) {
+                int j = i + s * entriesPerSegment;
+                Entry<?, ?> entry = entries[j];
+                if (entry != Entry.NULL && purge.apply(entry.generation)) {
+                    entries[j] = Entry.NULL;
+                    size.decrementAndGet();
+                    weight.addAndGet(-weighEntry(entry));
+                }
             }
+            segments[s].unlock();
         }
     }
 
-    @SuppressWarnings("unchecked")
     private int weighEntry(Entry<?, ?> entry) {
         return weigher.weigh((K) entry.key, (V) entry.value);
     }
 
     @Override
-    public synchronized String toString() {
+    public String toString() {
         return "PriorityCache" +
-            "{ costs=" + toString(costs) +
-            ", evictions=" + toString(evictions) + " }";
+                "{ costs=" + toString(costs) +
+                ", evictions=" + toString(evictions) + " }";
     }
 
-    private static String toString(int[] ints) {
+    private static String toString(AtomicInteger[] ints) {
         StringBuilder b = new StringBuilder("[");
         String sep = "";
         for (int i = 0; i < ints.length; i++) {
-            if (ints[i] > 0) {
-                b.append(sep).append(i).append("->").append(ints[i]);
+            int value = ints[i].get();
+            if (value > 0) {
+                b.append(sep).append(i).append("->").append(value);
                 sep = ",";
             }
         }
@@ -329,11 +386,12 @@ public class PriorityCache<K, V> {
      */
     @NotNull
     public CacheStats getStats() {
-        return new CacheStats(hitCount, missCount, loadCount, 
loadExceptionCount, 0, evictionCount);
+        return new CacheStats(hitCount.get(), missCount.get(), loadCount.get(),
+                loadExceptionCount.get(), 0, evictionCount.get());
     }
 
     public long estimateCurrentWeight() {
-        return weight;
+        return weight.get();
     }
 
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
index 8c42af71a0..bda82108ed 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
@@ -78,7 +78,9 @@ public class Compact {
 
         private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
 
-        private CompactorType compactorType = 
CompactorType.CHECKPOINT_COMPACTOR;
+        private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
+
+        private int concurrency = -1;
 
         private Builder() {
             // Prevent external instantiation.
@@ -164,7 +166,7 @@ public class Compact {
 
         /**
          * The compactor type to be used by compaction. If not specified it 
defaults to
-         * "diff" compactor
+         * "parallel" compactor
          * @param compactorType the compactor type
          * @return this builder
          */
@@ -173,6 +175,16 @@ public class Compact {
             return this;
         }
 
+        /**
+         * The number of threads to be used for compaction. This only applies 
to the "parallel" compactor
+         * @param concurrency the number of threads
+         * @return this builder
+         */
+        public Builder withConcurrency(int concurrency) {
+            this.concurrency = concurrency;
+            return this;
+        }
+
         /**
          * Create an executable version of the {@link Compact} command.
          *
@@ -267,6 +279,8 @@ public class Compact {
 
     private final CompactorType compactorType;
 
+    private final int concurrency;
+
     private Compact(Builder builder) {
         this.path = builder.path;
         this.journal = new File(builder.path, "journal.log");
@@ -275,6 +289,7 @@ public class Compact {
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
         this.compactorType = builder.compactorType;
+        this.concurrency = builder.concurrency;
     }
 
     public int run() {
@@ -330,7 +345,8 @@ public class Compact {
             .withGCOptions(defaultGCOptions()
                 .setOffline()
                 .setGCLogInterval(gcLogInterval)
-                .setCompactorType(compactorType));
+                .setCompactorType(compactorType)
+                .setConcurrency(concurrency));
         if (fileAccessMode.memoryMapped != null) {
             builder.withMemoryMapping(fileAccessMode.memoryMapped);
         }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
similarity index 83%
copy from 
oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
copy to 
oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
index cfe547a9a2..9897b947ff 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
@@ -20,17 +20,15 @@ package org.apache.jackrabbit.oak.segment;
 
 import static java.util.concurrent.TimeUnit.DAYS;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.addTestContent;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameRecord;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameStableId;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.checkGeneration;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createBlob;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createCompactor;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.getCheckpoint;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.addTestContent;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameRecord;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameStableId;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.checkGeneration;
+import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.createBlob;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.getCheckpoint;
 import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static 
org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -54,26 +52,26 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
-public class CheckpointCompactorExternalBlobTest {
+public abstract class AbstractCompactorExternalBlobTest {
 
     private TemporaryFolder folder = new TemporaryFolder(new File("target"));
 
-    private TemporaryBlobStore tempoararyBlobStore = new 
TemporaryBlobStore(folder);
+    private TemporaryBlobStore temporaryBlobStore = new 
TemporaryBlobStore(folder);
 
     private FileStore fileStore;
 
     private SegmentNodeStore nodeStore;
 
-    private CheckpointCompactor compactor;
+    private Compactor compactor;
 
     private GCGeneration compactedGeneration;
 
     @Rule
     public RuleChain rules = RuleChain.outerRule(folder)
-        .around(tempoararyBlobStore);
+        .around(temporaryBlobStore);
 
     public void setup(boolean withBlobStore) throws IOException, 
InvalidFileStoreVersionException {
-        BlobStore blobStore = tempoararyBlobStore.blobStore();
+        BlobStore blobStore = temporaryBlobStore.blobStore();
         FileStoreBuilder fileStoreBuilder = fileStoreBuilder(folder.getRoot());
 
         if (withBlobStore) {
@@ -86,6 +84,8 @@ public class CheckpointCompactorExternalBlobTest {
         compactor = createCompactor(fileStore, compactedGeneration);
     }
 
+    protected abstract Compactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation);
+
     @After
     public void tearDown() {
         fileStore.close();
@@ -119,7 +119,7 @@ public class CheckpointCompactorExternalBlobTest {
         SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, 
uncompacted1, EMPTY_NODE, Canceller.newCanceller());
 
         assertNotNull(compacted1);
-        assertFalse(uncompacted1 == compacted1);
+        assertNotSame(uncompacted1, compacted1);
         checkGeneration(compacted1, compactedGeneration);
 
         assertSameStableId(uncompacted1, compacted1);
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
similarity index 88%
copy from 
oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
copy to 
oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
index e6b64c6cd4..8ce843c0da 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
@@ -20,12 +20,11 @@ package org.apache.jackrabbit.oak.segment;
 
 import static java.util.concurrent.TimeUnit.DAYS;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.addTestContent;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameRecord;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameStableId;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.checkGeneration;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createCompactor;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.getCheckpoint;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.addTestContent;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameRecord;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameStableId;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.checkGeneration;
+import static 
org.apache.jackrabbit.oak.segment.CompactorTestUtils.getCheckpoint;
 import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static 
org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
 import static org.junit.Assert.assertEquals;
@@ -40,13 +39,14 @@ import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class CheckpointCompactorTest {
+public abstract class AbstractCompactorTest {
     @Rule
     public TemporaryFolder folder = new TemporaryFolder(new File("target"));
 
@@ -54,7 +54,7 @@ public class CheckpointCompactorTest {
 
     private SegmentNodeStore nodeStore;
 
-    private CheckpointCompactor compactor;
+    private Compactor compactor;
 
     private GCGeneration compactedGeneration;
 
@@ -66,6 +66,8 @@ public class CheckpointCompactorTest {
         compactor = createCompactor(fileStore, compactedGeneration);
     }
 
+    protected abstract Compactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation);
+
     @After
     public void tearDown() {
         fileStore.close();
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
index cfe547a9a2..baca4d5f04 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
@@ -18,125 +18,26 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static java.util.concurrent.TimeUnit.DAYS;
-import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.addTestContent;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameRecord;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameStableId;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.checkGeneration;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createBlob;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createCompactor;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.getCheckpoint;
-import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
-import static 
org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
-import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
-import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
+import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.apache.jackrabbit.oak.segment.test.TemporaryBlobStore;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.jetbrains.annotations.NotNull;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.RuleChain;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-
-public class CheckpointCompactorExternalBlobTest {
-
-    private TemporaryFolder folder = new TemporaryFolder(new File("target"));
-
-    private TemporaryBlobStore tempoararyBlobStore = new 
TemporaryBlobStore(folder);
-
-    private FileStore fileStore;
-
-    private SegmentNodeStore nodeStore;
-
-    private CheckpointCompactor compactor;
-
-    private GCGeneration compactedGeneration;
-
-    @Rule
-    public RuleChain rules = RuleChain.outerRule(folder)
-        .around(tempoararyBlobStore);
-
-    public void setup(boolean withBlobStore) throws IOException, 
InvalidFileStoreVersionException {
-        BlobStore blobStore = tempoararyBlobStore.blobStore();
-        FileStoreBuilder fileStoreBuilder = fileStoreBuilder(folder.getRoot());
-
-        if (withBlobStore) {
-            fileStoreBuilder = fileStoreBuilder.withBlobStore(blobStore);
-        }
-
-        fileStore = fileStoreBuilder.build();
-        nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
-        compactedGeneration = newGCGeneration(1,1, true);
-        compactor = createCompactor(fileStore, compactedGeneration);
-    }
 
-    @After
-    public void tearDown() {
-        fileStore.close();
+import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+
+public class CheckpointCompactorExternalBlobTest extends 
AbstractCompactorExternalBlobTest {
+    @Override
+    protected CheckpointCompactor createCompactor(@NotNull FileStore 
fileStore, @NotNull GCGeneration generation) {
+        SegmentWriter writer = defaultSegmentWriterBuilder("c")
+                .withGeneration(generation)
+                .build(fileStore);
+
+        return new CheckpointCompactor(
+                GCMonitor.EMPTY,
+                fileStore.getReader(),
+                writer,
+                fileStore.getBlobStore(),
+                GCNodeWriteMonitor.EMPTY);
     }
-
-    @Test
-    public void testCompact() throws Exception {
-        setup(true);
-
-        // add two blobs which will be persisted in the blob store
-        addTestContent("cp1", nodeStore, SegmentTestConstants.MEDIUM_LIMIT);
-        String cp1 = nodeStore.checkpoint(DAYS.toMillis(1));
-        addTestContent("cp2", nodeStore, SegmentTestConstants.MEDIUM_LIMIT);
-        String cp2 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        // update the two blobs from the blob store
-        updateTestContent("cp1", nodeStore);
-        String cp3 = nodeStore.checkpoint(DAYS.toMillis(1));
-        updateTestContent("cp2", nodeStore);
-        String cp4 = nodeStore.checkpoint(DAYS.toMillis(1));
-        fileStore.close();
-
-        // no blob store configured
-        setup(false);
-
-        // this time the updated blob will be stored in the file store
-        updateTestContent("cp2", nodeStore);
-        String cp5 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        SegmentNodeState uncompacted1 = fileStore.getHead();
-        SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, 
uncompacted1, EMPTY_NODE, Canceller.newCanceller());
-
-        assertNotNull(compacted1);
-        assertFalse(uncompacted1 == compacted1);
-        checkGeneration(compacted1, compactedGeneration);
-
-        assertSameStableId(uncompacted1, compacted1);
-        assertSameStableId(getCheckpoint(uncompacted1, cp1), 
getCheckpoint(compacted1, cp1));
-        assertSameStableId(getCheckpoint(uncompacted1, cp2), 
getCheckpoint(compacted1, cp2));
-        assertSameStableId(getCheckpoint(uncompacted1, cp3), 
getCheckpoint(compacted1, cp3));
-        assertSameStableId(getCheckpoint(uncompacted1, cp4), 
getCheckpoint(compacted1, cp4));
-        assertSameStableId(getCheckpoint(uncompacted1, cp5), 
getCheckpoint(compacted1, cp5));
-        assertSameRecord(getCheckpoint(compacted1, cp5), 
compacted1.getChildNode("root"));
-    }
-
-    private static void updateTestContent(@NotNull String parent, @NotNull 
NodeStore nodeStore)
-            throws CommitFailedException, IOException {
-        NodeBuilder rootBuilder = nodeStore.getRoot().builder();
-        NodeBuilder parentBuilder = rootBuilder.child(parent);
-        parentBuilder.child("b").setProperty("bin", createBlob(nodeStore, 
SegmentTestConstants.MEDIUM_LIMIT));
-        nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-    }
-
-}
\ No newline at end of file
+}
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
index e6b64c6cd4..95ea97fd72 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
@@ -18,99 +18,26 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static java.util.concurrent.TimeUnit.DAYS;
-import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.addTestContent;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameRecord;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameStableId;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.checkGeneration;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createCompactor;
-import static 
org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.getCheckpoint;
-import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
-import static 
org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
-import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
+import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class CheckpointCompactorTest {
-    @Rule
-    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
-
-    private FileStore fileStore;
-
-    private SegmentNodeStore nodeStore;
-
-    private CheckpointCompactor compactor;
-
-    private GCGeneration compactedGeneration;
-
-    @Before
-    public void setup() throws IOException, InvalidFileStoreVersionException {
-        fileStore = fileStoreBuilder(folder.getRoot()).build();
-        nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
-        compactedGeneration = newGCGeneration(1,1, true);
-        compactor = createCompactor(fileStore, compactedGeneration);
-    }
-
-    @After
-    public void tearDown() {
-        fileStore.close();
-    }
-
-    @Test
-    public void testCompact() throws Exception {
-        addTestContent("cp1", nodeStore, 42);
-        String cp1 = nodeStore.checkpoint(DAYS.toMillis(1));
-        addTestContent("cp2", nodeStore, 42);
-        String cp2 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        SegmentNodeState uncompacted1 = fileStore.getHead();
-        SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, 
uncompacted1, EMPTY_NODE, Canceller.newCanceller());
-        assertNotNull(compacted1);
-        assertFalse(uncompacted1 == compacted1);
-        checkGeneration(compacted1, compactedGeneration);
-
-        assertSameStableId(uncompacted1, compacted1);
-        assertSameStableId(getCheckpoint(uncompacted1, cp1), 
getCheckpoint(compacted1, cp1));
-        assertSameStableId(getCheckpoint(uncompacted1, cp2), 
getCheckpoint(compacted1, cp2));
-        assertSameRecord(getCheckpoint(compacted1, cp2), 
compacted1.getChildNode("root"));
-
-        // Simulate a 2nd compaction cycle
-        addTestContent("cp3", nodeStore, 42);
-        String cp3 = nodeStore.checkpoint(DAYS.toMillis(1));
-        addTestContent("cp4", nodeStore, 42);
-        String cp4 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        SegmentNodeState uncompacted2 = fileStore.getHead();
-        SegmentNodeState compacted2 = compactor.compact(uncompacted1, 
uncompacted2, compacted1, Canceller.newCanceller());
-        assertNotNull(compacted2);
-        assertFalse(uncompacted2 == compacted2);
-        checkGeneration(compacted2, compactedGeneration);
-
-        
assertTrue(fileStore.getRevisions().setHead(uncompacted2.getRecordId(), 
compacted2.getRecordId()));
-
-        assertEquals(uncompacted2, compacted2);
-        assertSameStableId(uncompacted2, compacted2);
-        assertSameStableId(getCheckpoint(uncompacted2, cp1), 
getCheckpoint(compacted2, cp1));
-        assertSameStableId(getCheckpoint(uncompacted2, cp2), 
getCheckpoint(compacted2, cp2));
-        assertSameStableId(getCheckpoint(uncompacted2, cp3), 
getCheckpoint(compacted2, cp3));
-        assertSameStableId(getCheckpoint(uncompacted2, cp4), 
getCheckpoint(compacted2, cp4));
-        assertSameRecord(getCheckpoint(compacted1, cp1), 
getCheckpoint(compacted2, cp1));
-        assertSameRecord(getCheckpoint(compacted1, cp2), 
getCheckpoint(compacted2, cp2));
-        assertSameRecord(getCheckpoint(compacted2, cp4), 
compacted2.getChildNode("root"));
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+
+public class CheckpointCompactorTest extends AbstractCompactorTest {
+    @Override
+    protected CheckpointCompactor createCompactor(@NotNull FileStore 
fileStore, @NotNull GCGeneration generation) {
+        SegmentWriter writer = defaultSegmentWriterBuilder("c")
+                .withGeneration(generation)
+                .build(fileStore);
+
+        return new CheckpointCompactor(
+                GCMonitor.EMPTY,
+                fileStore.getReader(),
+                writer,
+                fileStore.getBlobStore(),
+                GCNodeWriteMonitor.EMPTY);
     }
 }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
index 4e433664d5..bb4ba63e58 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
@@ -598,14 +598,11 @@ public class CompactionAndCleanupIT {
         try {
             SegmentNodeStore nodeStore = 
SegmentNodeStoreBuilders.builder(fileStore).build();
 
-            final Callable<Void> cancel = new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    // Give the compaction thread a head start
-                    sleepUninterruptibly(1000, MILLISECONDS);
-                    fileStore.cancelGC();
-                    return null;
-                }
+            final Callable<Void> cancel = () -> {
+                // Give the compaction thread a head start
+                sleepUninterruptibly(1000, MILLISECONDS);
+                fileStore.cancelGC();
+                return null;
             };
 
             for (int k = 0; k < 100; k++) {
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTestUtils.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTestUtils.java
similarity index 88%
rename from 
oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTestUtils.java
rename to 
oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTestUtils.java
index 75abf39165..70d06135d4 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTestUtils.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTestUtils.java
@@ -43,11 +43,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Random;
 
-public class CheckpointCompactorTestUtils {
+public class CompactorTestUtils {
 
-    private CheckpointCompactorTestUtils() {
-
-    }
+    private CompactorTestUtils() {}
 
     public static void checkGeneration(NodeState node, GCGeneration 
gcGeneration) {
         assertTrue(node instanceof SegmentNodeState);
@@ -85,20 +83,6 @@ public class CheckpointCompactorTestUtils {
                 ((SegmentNodeState) node2).getRecordId());
     }
 
-    @NotNull
-    public static CheckpointCompactor createCompactor(@NotNull FileStore 
fileStore, @NotNull GCGeneration generation) {
-        SegmentWriter writer = defaultSegmentWriterBuilder("c")
-                .withGeneration(generation)
-                .build(fileStore);
-
-        return new CheckpointCompactor(
-                GCMonitor.EMPTY,
-                fileStore.getReader(),
-                writer,
-                fileStore.getBlobStore(),
-                GCNodeWriteMonitor.EMPTY);
-    }
-
     public static void addTestContent(@NotNull String parent, @NotNull 
NodeStore nodeStore, int binPropertySize)
             throws CommitFailedException, IOException {
         NodeBuilder rootBuilder = nodeStore.getRoot().builder();
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
new file mode 100644
index 0000000000..8aea965f66
--- /dev/null
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+
+public class ParallelCompactorExternalBlobTest extends 
AbstractCompactorExternalBlobTest {
+    @Override
+    protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
+        SegmentWriter writer = defaultSegmentWriterBuilder("c")
+                .withGeneration(generation)
+                .withWriterPool()
+                .build(fileStore);
+
+        return new ParallelCompactor(
+                GCMonitor.EMPTY,
+                fileStore.getReader(),
+                writer,
+                fileStore.getBlobStore(),
+                GCNodeWriteMonitor.EMPTY,
+                4);
+    }
+}
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
new file mode 100644
index 0000000000..596022bba3
--- /dev/null
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+
+public class ParallelCompactorTest extends AbstractCompactorTest {
+    @Override
+    protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
+        SegmentWriter writer = defaultSegmentWriterBuilder("c")
+                .withGeneration(generation)
+                .withWriterPool()
+                .build(fileStore);
+
+        return new ParallelCompactor(
+                GCMonitor.EMPTY,
+                fileStore.getReader(),
+                writer,
+                fileStore.getBlobStore(),
+                GCNodeWriteMonitor.EMPTY,
+                4);
+    }
+}
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
index 98db981b14..e84dd1582f 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
@@ -38,18 +38,9 @@ public class RecordCacheStatsTest {
     private final Random rnd = new Random();
     private final MemoryStore store = new MemoryStore();
 
-    private final RecordCache<Integer> cache = newRecordCache(KEYS);
+    private final RecordCache<Integer> cache = newRecordCache(KEYS * 4 / 3);
     private final RecordCacheStats cacheStats =
-            new RecordCacheStats(NAME,
-                new Supplier<CacheStats>() {
-                    @Override public CacheStats get() { return 
cache.getStats(); }
-                },
-                new Supplier<Long>() {
-                    @Override public Long get() { return cache.size(); }
-                },
-                new Supplier<Long>() {
-                    @Override public Long get() { return 
cache.estimateCurrentWeight(); }
-                });
+            new RecordCacheStats(NAME, cache::getStats, cache::size, 
cache::estimateCurrentWeight);
 
     private int hits;
 
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
index fe45586423..6431ca7902 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
@@ -71,12 +71,7 @@ public class SegmentBufferWriterPoolTest {
     }
 
     private Future<RecordId> execute(GCGeneration gcGeneration, final 
WriteOperation op, int executor) {
-        return executors[executor].submit(new Callable<RecordId>() {
-            @Override
-            public RecordId call() throws Exception {
-                return pool.execute(gcGeneration, op);
-            }
-        });
+        return executors[executor].submit(() -> pool.execute(gcGeneration, 
op));
     }
 
     private WriteOperation createOp(final String key, final 
ConcurrentMap<String, SegmentBufferWriter> map) {
@@ -206,27 +201,20 @@ public class SegmentBufferWriterPoolTest {
     @Test
     public void testFlushBlocks() throws ExecutionException, 
InterruptedException {
         GCGeneration gcGeneration = pool.getGCGeneration();
-        Future<RecordId> res = execute(gcGeneration, new WriteOperation() {
-            @Nullable
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) {
-                try {
-                    // This should deadlock as flush waits for this write
-                    // operation to finish, which in this case contains the
-                    // call to flush itself.
-                    executors[1].submit(new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
-                            pool.flush(store);
-                            return null;
-                        }
-                    }).get(100, MILLISECONDS);
-                    return null;    // No deadlock -> null indicates test 
failure
-                } catch (InterruptedException | ExecutionException ignore) {
-                    return null;    // No deadlock -> null indicates test 
failure
-                } catch (TimeoutException ignore) {
-                    return rootId;  // Deadlock -> rootId indicates test pass
-                }
+        Future<RecordId> res = execute(gcGeneration, writer -> {
+            try {
+                // This should deadlock as flush waits for this write
+                // operation to finish, which in this case contains the
+                // call to flush itself.
+                executors[1].submit(() -> {
+                    pool.flush(store);
+                    return null;
+                }).get(100, MILLISECONDS);
+                return null;    // No deadlock -> null indicates test failure
+            } catch (InterruptedException | ExecutionException ignore) {
+                return null;    // No deadlock -> null indicates test failure
+            } catch (TimeoutException ignore) {
+                return rootId;  // Deadlock -> rootId indicates test pass
             }
         }, 0);
 

Reply via email to