Author: mduerig Date: Mon Oct 10 15:10:06 2016 New Revision: 1764116 URL: http://svn.apache.org/viewvc?rev=1764116&view=rev Log: OAK-4617: Align SegmentRevisionGC MBean with new generation based GC Encapsulate gc functionality in a GarbageCollector subclass and expose a method for cancelling gc
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java Mon Oct 10 15:10:06 2016 @@ -397,7 +397,7 @@ public class SegmentNodeStoreService ext registrations.add(registerMBean( whiteboard, SegmentRevisionGC.class, - new SegmentRevisionGCMBean(gcOptions), + new SegmentRevisionGCMBean(store, gcOptions), SegmentRevisionGC.TYPE, "Segment node store gc options" )); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java Mon Oct 10 15:10:06 2016 @@ -79,8 +79,6 @@ public class SegmentGCOptions { "oak.segment.compaction.gcSizeDeltaEstimation", SIZE_DELTA_ESTIMATION_DEFAULT); - private volatile boolean stopCompaction; - public SegmentGCOptions(boolean paused, int gainThreshold, int retryCount, int forceTimeout) { this.paused = paused; this.gainThreshold = gainThreshold; @@ -296,13 +294,4 @@ public class SegmentGCOptions { return this; } - public boolean isStopCompaction() { - return stopCompaction; - } - - public boolean setStopCompaction(boolean stop) { - this.stopCompaction = stop; - return stop; - } - } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java Mon Oct 10 15:10:06 2016 @@ -102,7 +102,13 @@ public interface SegmentRevisionGC { void setGcSizeDeltaEstimation(long gcSizeDeltaEstimation); /** - * Raise the flag to signal compaction to stop as soon as possible. + * Initiate a revision garbage collection operation */ - void stopCompaction(); + void startRevisionGC(); + + /** + * Cancel a running revision garbage collection operation. Does nothing + * if revision garbage collection is not running. + */ + void cancelRevisionGC(); } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java Mon Oct 10 15:10:06 2016 @@ -19,18 +19,30 @@ package org.apache.jackrabbit.oak.segment.compaction; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; + +import javax.annotation.Nonnull; + import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; +import org.apache.jackrabbit.oak.segment.file.FileStore; // FIXME OAK-4617: Align SegmentRevisionGC MBean with new generation based GC public class SegmentRevisionGCMBean extends AnnotatedStandardMBean implements SegmentRevisionGC { + @Nonnull + private final FileStore fileStore; + + @Nonnull private final SegmentGCOptions gcOptions; - public SegmentRevisionGCMBean(SegmentGCOptions gcOptions) { + public SegmentRevisionGCMBean(@Nonnull FileStore fileStore, @Nonnull SegmentGCOptions gcOptions) { super(SegmentRevisionGC.class); - this.gcOptions = gcOptions; + this.fileStore = checkNotNull(fileStore); + this.gcOptions = checkNotNull(gcOptions); } @Override @@ -94,8 +106,13 @@ public class SegmentRevisionGCMBean } @Override - public void stopCompaction() { - gcOptions.setStopCompaction(true); + public void startRevisionGC() { + fileStore.getGCRunner().run(); + } + + @Override + public void cancelRevisionGC() { + fileStore.cancelGC(); } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Mon Oct 10 15:10:06 2016 @@ -63,7 +63,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -169,6 +168,9 @@ public class FileStore implements Segmen private final boolean memoryMapping; + @Nonnull + private final GarbageCollector garbageCollector; + private volatile List<TarReader> readers; private volatile TarWriter tarWriter; @@ -185,15 +187,6 @@ public class FileStore implements Segmen */ private final Scheduler fileStoreScheduler = new Scheduler("FileStore background tasks"); - private final SegmentGCOptions gcOptions; - - private final GCJournal gcJournal; - - /** - * Semaphore guarding overlapping calls to {@link #compact()} and {@link #cleanup()} - */ - private final Semaphore gcSemaphore = new Semaphore(1); - /** * List of old tar file generations that are waiting to be removed. They can * not be removed immediately, because they first need to be closed, and the @@ -202,11 +195,6 @@ public class FileStore implements Segmen private final FileReaper fileReaper = new FileReaper(); /** - * {@code GcListener} listening to this instance's gc progress - */ - private final GCListener gcListener; - - /** * This flag is periodically updated by calling the {@code SegmentGCOptions} * at regular intervals. */ @@ -235,7 +223,7 @@ public class FileStore implements Segmen }; // FIXME OAK-4450: Properly split the FileStore into read-only and r/w variants - FileStore(FileStoreBuilder builder, final boolean readOnly) throws InvalidFileStoreVersionException, IOException { + FileStore(final FileStoreBuilder builder, boolean readOnly) throws InvalidFileStoreVersionException, IOException { this.directory = builder.getDirectory(); if (!readOnly) { lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw"); @@ -261,7 +249,6 @@ public class FileStore implements Segmen }, blobStore, builder.getStringCacheSize(), builder.getTemplateCacheSize()); this.binaryReferenceConsumer = new BinaryReferenceConsumer() { - @Override public void consume(int generation, UUID segmentId, String binaryReference) { fileStoreLock.writeLock().lock(); @@ -271,7 +258,6 @@ public class FileStore implements Segmen fileStoreLock.writeLock().unlock(); } } - }; this.segmentWriter = segmentWriterBuilder("sys") @@ -286,9 +272,7 @@ public class FileStore implements Segmen .build(this); this.maxFileSize = builder.getMaxFileSize() * MB; this.memoryMapping = builder.getMemoryMapping(); - this.gcListener = builder.getGcListener(); - this.gcOptions = builder.getGcOptions(); - this.gcJournal = new GCJournal(directory); + this.garbageCollector = new GarbageCollector(builder.getGcOptions(), builder.getGcListener(), new GCJournal(directory)); Map<Integer, Map<Character, File>> map = collectFiles(directory); @@ -350,9 +334,10 @@ public class FileStore implements Segmen }); fileStoreScheduler.scheduleAtFixedRate( format("TarMK disk space check [%s]", directory), 1, MINUTES, new Runnable() { + SegmentGCOptions gcOptions = builder.getGcOptions(); @Override public void run() { - checkDiskSpace(); + checkDiskSpace(gcOptions); } }); } @@ -481,95 +466,12 @@ public class FileStore implements Segmen try { gc(); } catch (IOException e) { - log.error("Error running compaction", e); + log.error("Error running revision garbage collection", e); } } }); } - /** - * Run garbage collection: estimation, compaction, cleanup - * @throws IOException - */ - public void gc() throws IOException { - gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet()); - Stopwatch watch = Stopwatch.createStarted(); - - int gainThreshold = gcOptions.getGainThreshold(); - boolean sufficientEstimatedGain = true; - if (gainThreshold <= 0) { - gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)", - GC_COUNT, gainThreshold); - } else if (gcOptions.isPaused()) { - gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT); - } else { - gcListener.info("TarMK GC #{}: estimation started", GC_COUNT); - Supplier<Boolean> cancel = newCancelCompactionCondition(); - GCEstimation estimate = estimateCompactionGain(cancel); - if (cancel.get()) { - gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel); - } - - sufficientEstimatedGain = estimate.gcNeeded(); - String gcLog = estimate.gcLog(); - if (sufficientEstimatedGain) { - gcListener.info( - "TarMK GC #{}: estimation completed in {} ({} ms). {}", - GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog); - } else { - gcListener.skipped( - "TarMK GC #{}: estimation completed in {} ({} ms). {}", - GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog); - } - } - - if (sufficientEstimatedGain) { - if (!gcOptions.isPaused()) { - logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats()); - log(segmentWriter.getNodeCacheOccupancyInfo()); - Runnable cleanupTask = compact(); - if (cleanupTask != null) { - cleanupTask.run(); - } - logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats()); - log(segmentWriter.getNodeCacheOccupancyInfo()); - } else { - gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT); - } - } - } - - private static void logAndClear( - @Nonnull DescriptiveStatistics nodeWriteTimeStats, - @Nonnull DescriptiveStatistics nodeCompactTimeStats) { - log.info("Node write time statistics (ns) {}", toString(nodeWriteTimeStats)); - log.info("Node compact time statistics (ns) {}", toString(nodeCompactTimeStats)); - nodeWriteTimeStats.clear(); - nodeCompactTimeStats.clear(); - } - - private static void log(@CheckForNull String nodeCacheOccupancyInfo) { - if (nodeCacheOccupancyInfo != null) { - log.info("NodeCache occupancy: {}", nodeCacheOccupancyInfo); - } - } - - private static String toString(DescriptiveStatistics statistics) { - DecimalFormat sci = new DecimalFormat("##0.0E0"); - DecimalFormatSymbols symbols = sci.getDecimalFormatSymbols(); - symbols.setNaN("NaN"); - symbols.setInfinity("Inf"); - sci.setDecimalFormatSymbols(symbols); - return "min=" + sci.format(statistics.getMin()) + - ", 10%=" + sci.format(statistics.getPercentile(10.0)) + - ", 50%=" + sci.format(statistics.getPercentile(50.0)) + - ", 90%=" + sci.format(statistics.getPercentile(90.0)) + - ", max=" + sci.format(statistics.getMax()) + - ", mean=" + sci.format(statistics.getMean()) + - ", stdev=" + sci.format(statistics.getStandardDeviation()) + - ", N=" + sci.format(statistics.getN()); - } - static Map<Integer, Map<Character, File>> collectFiles(File directory) { Map<Integer, Map<Character, File>> dataFiles = newHashMap(); Map<Integer, File> bulkFiles = newHashMap(); @@ -695,35 +597,6 @@ public class FileStore implements Segmen } } - /** - * Estimated compaction gain. The result will be undefined if stopped through - * the passed {@code stop} signal. - * @param stop signal for stopping the estimation process. - * @return compaction gain estimate - */ - GCEstimation estimateCompactionGain(Supplier<Boolean> stop) { - if (gcOptions.isGcSizeDeltaEstimation()) { - SizeDeltaGcEstimation e = new SizeDeltaGcEstimation(gcOptions, - gcJournal, stats.getApproximateSize()); - return e; - } - - CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), - count(), stop, gcOptions.getGainThreshold()); - fileStoreLock.readLock().lock(); - try { - for (TarReader reader : readers) { - reader.accept(estimate); - if (stop.get()) { - break; - } - } - } finally { - fileStoreLock.readLock().unlock(); - } - return estimate; - } - public FileStoreStats getStats() { return stats; } @@ -735,209 +608,47 @@ public class FileStore implements Segmen segmentWriter.flush(); tarWriter.flush(); stats.flushed(); - + return null; } }); } /** - * Try to acquire the passed {@code semaphore} - * @param semaphore - * @return a closable for releasing the {@code semaphore} - * @throws IllegalStateException if acquiring the {@code semaphore} failed. - */ - private static Closeable withSemaphore(@Nonnull final Semaphore semaphore) { - if (!semaphore.tryAcquire()) { - throw new IllegalStateException("Compaction or cleanup already in progress"); - } - return new Closeable() { - @Override - public void close() { - semaphore.release(); - } - }; - } - - /** - * Run garbage collection on the segment level: reclaim those data segments - * that are from an old segment generation and those bulk segments that are not - * reachable anymore. - * Those tar files that shrink by at least 25% are rewritten to a new tar generation - * skipping the reclaimed segments. + * Run garbage collection: estimation, compaction, cleanup + * @throws IOException */ - public void cleanup() throws IOException { - fileReaper.add(cleanupOldGenerations(getGcGeneration())); + public void gc() throws IOException { + garbageCollector.run(); } /** - * Cleanup segments that are from an old generation. That segments whose generation - * is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older. - * @param gcGeneration - * @return list of files to be removed - * @throws IOException + * Run the compaction gain estimation process. + * @return */ - private List<File> cleanupOldGenerations(int gcGeneration) throws IOException { - final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations(); - - Predicate<Integer> reclaimPredicate = new Predicate<Integer>() { - @Override - public boolean apply(Integer generation) { - return generation <= reclaimGeneration; - } - }; - return cleanup(reclaimPredicate, - "gc-count=" + GC_COUNT + - ",gc-status=success" + - ",store-generation=" + gcGeneration + - ",reclaim-predicate=(generation<=" + reclaimGeneration + ")"); + public GCEstimation estimateCompactionGain() { + return garbageCollector.estimateCompactionGain(Suppliers.ofInstance(false)); } /** - * Cleanup segments of the given generation {@code gcGeneration}. - * @param gcGeneration - * @return list of files to be removed - * @throws IOException + * Copy every referenced record in data (non-bulk) segments. Bulk segments + * are fully kept (they are only removed in cleanup, if there is no + * reference to them). + * @return {@code true} on success, {@code false} otherwise. */ - private List<File> cleanupGeneration(final int gcGeneration) throws IOException { - Predicate<Integer> cleanupPredicate = new Predicate<Integer>() { - @Override - public boolean apply(Integer generation) { - return generation == gcGeneration; - } - }; - return cleanup(cleanupPredicate, - "gc-count=" + GC_COUNT + - ",gc-status=failed" + - ",store-generation=" + (gcGeneration - 1) + - ",reclaim-predicate=(generation==" + gcGeneration + ")"); + public boolean compact() throws IOException { + return garbageCollector.compact() > 0; } /** - * Cleanup segments whose generation matches the {@code reclaimGeneration} predicate. - * @param reclaimGeneration - * @param gcInfo gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)} - * @return list of files to be removed - * @throws IOException + * Run garbage collection on the segment level: reclaim those data segments + * that are from an old segment generation and those bulk segments that are not + * reachable anymore. + * Those tar files that shrink by at least 25% are rewritten to a new tar generation + * skipping the reclaimed segments. */ - private List<File> cleanup( - @Nonnull Predicate<Integer> reclaimGeneration, - @Nonnull String gcInfo) - throws IOException { - try (Closeable s = withSemaphore(gcSemaphore)) { - Stopwatch watch = Stopwatch.createStarted(); - Set<UUID> bulkRefs = newHashSet(); - Map<TarReader, TarReader> cleaned = newLinkedHashMap(); - - long initialSize = 0; - fileStoreLock.writeLock().lock(); - try { - gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT); - - newWriter(); - segmentCache.clear(); - - // Suggest to the JVM that now would be a good time - // to clear stale weak references in the SegmentTracker - System.gc(); - - collectBulkReferences(bulkRefs); - - for (TarReader reader : readers) { - cleaned.put(reader, reader); - initialSize += reader.size(); - } - } finally { - fileStoreLock.writeLock().unlock(); - } - - gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)", - GC_COUNT, humanReadableByteCount(initialSize), initialSize); - - Set<UUID> reclaim = newHashSet(); - for (TarReader reader : cleaned.keySet()) { - reader.mark(bulkRefs, reclaim, reclaimGeneration); - log.info("{}: size of bulk references/reclaim set {}/{}", - reader, bulkRefs.size(), reclaim.size()); - if (shutdown) { - gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); - break; - } - } - Set<UUID> reclaimed = newHashSet(); - for (TarReader reader : cleaned.keySet()) { - cleaned.put(reader, reader.sweep(reclaim, reclaimed)); - if (shutdown) { - gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); - break; - } - } - - // it doesn't account for concurrent commits that might have happened - long afterCleanupSize = 0; - - List<TarReader> oldReaders = newArrayList(); - fileStoreLock.writeLock().lock(); - try { - // Replace current list of reader with the cleaned readers taking care not to lose - // any new reader that might have come in through concurrent calls to newWriter() - List<TarReader> sweptReaders = newArrayList(); - for (TarReader reader : readers) { - if (cleaned.containsKey(reader)) { - TarReader newReader = cleaned.get(reader); - if (newReader != null) { - sweptReaders.add(newReader); - afterCleanupSize += newReader.size(); - } - // if these two differ, the former represents the swept version of the latter - if (newReader != reader) { - oldReaders.add(reader); - } - } else { - sweptReaders.add(reader); - } - } - readers = sweptReaders; - } finally { - fileStoreLock.writeLock().unlock(); - } - tracker.clearSegmentIdTables(reclaimed, gcInfo); - - // Close old readers *after* setting readers to the new readers to avoid accessing - // a closed reader from readSegment() - LinkedList<File> toRemove = newLinkedList(); - for (TarReader oldReader : oldReaders) { - closeAndLogOnFail(oldReader); - File file = oldReader.getFile(); - gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName()); - toRemove.addLast(file); - } - - long finalSize = size(); - long reclaimedSize = initialSize - afterCleanupSize; - stats.reclaimed(reclaimedSize); - gcJournal.persist(reclaimedSize, finalSize); - gcListener.cleaned(reclaimedSize, finalSize); - gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" + - " and space reclaimed {} ({} bytes).", - GC_COUNT, watch, watch.elapsed(MILLISECONDS), - humanReadableByteCount(finalSize), finalSize, - humanReadableByteCount(reclaimedSize), reclaimedSize); - return toRemove; - } - } - - private void collectBulkReferences(Set<UUID> bulkRefs) { - Set<UUID> refs = newHashSet(); - for (SegmentId id : tracker.getReferencedSegmentIds()) { - refs.add(id.asUUID()); - } - tarWriter.collectReferences(refs); - for (UUID ref : refs) { - if (!isDataSegmentId(ref.getLeastSignificantBits())) { - bulkRefs.add(ref); - } - } + public void cleanup() throws IOException { + garbageCollector.cleanup(); } /** @@ -953,209 +664,15 @@ public class FileStore implements Segmen * @param collector reference collector called back for each blob reference found */ public void collectBlobReferences(ReferenceCollector collector) throws IOException { - segmentWriter.flush(); - List<TarReader> tarReaders = newArrayList(); - fileStoreLock.writeLock().lock(); - try { - newWriter(); - tarReaders.addAll(this.readers); - } finally { - fileStoreLock.writeLock().unlock(); - } - - int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1; - for (TarReader tarReader : tarReaders) { - tarReader.collectBlobReferences(collector, newReferenceReader(this), minGeneration); - } - } - - /** - * Returns the cancellation policy for the compaction phase. - * @return a supplier indicating if compaction should be canceled. - */ - private Supplier<Boolean> newCancelCompactionCondition() { - return new CancelCompactionSupplier(this); + garbageCollector.collectBlobReferences(collector); } /** - * @param duration - * @param unit - * @return {@code Supplier} instance which returns true once the time specified in - * {@code duration} and {@code unit} has passed. + * Cancel a running revision garbage collection compaction process as soon as possible. + * Does nothing if gc is not running. */ - private static Supplier<Boolean> timeOut(final long duration, @Nonnull final TimeUnit unit) { - return new Supplier<Boolean>() { - long deadline = currentTimeMillis() + MILLISECONDS.convert(duration, unit); - @Override - public Boolean get() { - return currentTimeMillis() > deadline; - } - }; - } - - /** - * @param supplier1 - * @param supplier2 - * @return {@code Supplier} instance that returns {@code true} iff {@code supplier1} returns - * {@code true} or otherwise {@code supplier2} returns {@code true}. - */ - private static Supplier<Boolean> or( - @Nonnull Supplier<Boolean> supplier1, - @Nonnull Supplier<Boolean> supplier2) { - if (supplier1.get()) { - return Suppliers.ofInstance(true); - } else { - return supplier2; - } - } - - /** - * Copy every referenced record in data (non-bulk) segments. Bulk segments - * are fully kept (they are only removed in cleanup, if there is no - * reference to them). - * @return A {@code Runnable} for cleaning up or {@code null} when compact failed. - */ - @CheckForNull - public Runnable compact() throws IOException { - try (Closeable s = withSemaphore(gcSemaphore)) { - Stopwatch watch = Stopwatch.createStarted(); - gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions); - - SegmentNodeState before = getHead(); - final int newGeneration = getGcGeneration() + 1; - SegmentBufferWriter bufferWriter = new SegmentBufferWriter(this, tracker, segmentReader, "c", newGeneration); - Supplier<Boolean> cancel = newCancelCompactionCondition(); - SegmentNodeState after = compact(bufferWriter, before, cancel); - if (after == null) { - gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel); - return null; - } - - gcListener.info("TarMK GC #{}: compacted {} to {}", - GC_COUNT, before.getRecordId(), after.getRecordId()); - - int cycles = 0; - boolean success = false; - while (cycles < gcOptions.getRetryCount() && - !(success = revisions.setHead(before.getRecordId(), after.getRecordId(), EXPEDITE_OPTION))) { - // Some other concurrent changes have been made. - // Rebase (and compact) those changes on top of the - // compacted state before retrying to set the head. - cycles++; - gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " + - "Compacting these commits. Cycle {} of {}", - GC_COUNT, cycles, gcOptions.getRetryCount()); - SegmentNodeState head = getHead(); - after = compact(bufferWriter, head, cancel); - if (after == null) { - gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel); - return null; - } - - gcListener.info("TarMK GC #{}: compacted {} against {} to {}", - GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId()); - before = head; - } - - if (!success) { - gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.", - GC_COUNT, cycles); - int forceTimeout = gcOptions.getForceTimeout(); - if (forceTimeout > 0) { - gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds", - GC_COUNT, forceTimeout); - cycles++; - success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS))); - if (!success) { - if (cancel.get()) { - gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " + - "Compaction was cancelled: {}.", GC_COUNT, cancel); - } else { - gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " + - "Most likely compaction didn't get exclusive access to the store.", GC_COUNT); - } - } - } - } - - if (success) { - gcListener.compacted(SUCCESS, newGeneration); - gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles", - GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles); - return new Runnable() { - @Override - public void run() { - try { - fileReaper.add(cleanupOldGenerations(newGeneration)); - } catch (IOException e) { - gcListener.error("TarMK GC #" + GC_COUNT + ": cleanup failed", e); - } - } - }; - } else { - gcListener.compacted(FAILURE, newGeneration); - gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles", - GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles); - return new Runnable() { - @Override - public void run() { - try { - gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT); - fileReaper.add(cleanupGeneration(newGeneration)); - } catch (IOException e) { - gcListener.error("TarMK GC #" + GC_COUNT + ": cleanup failed", e); - } - } - }; - } - } catch (InterruptedException e) { - gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e); - currentThread().interrupt(); - return null; - } catch (Exception e) { - gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e); - return null; - } - } - - private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head, - Supplier<Boolean> cancel) - throws IOException { - if (gcOptions.isOffline()) { - SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer); - return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions) - .compact(EMPTY_NODE, head, EMPTY_NODE); - } else { - return segmentWriter.writeNode(head, bufferWriter, cancel); - } - } - - private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter, - @Nonnull final Supplier<Boolean> cancel) - throws InterruptedException { - return revisions. - setHead(new Function<RecordId, RecordId>() { - @Nullable - @Override - public RecordId apply(RecordId base) { - try { - long t0 = currentTimeMillis(); - SegmentNodeState after = compact(bufferWriter, - segmentReader.readNode(base), cancel); - if (after == null) { - gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds", - GC_COUNT, (currentTimeMillis() - t0) / 1000); - return null; - } else { - return after.getRecordId(); - } - } catch (IOException e) { - gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e); - return null; - } - } - }, - timeout(gcOptions.getForceTimeout(), SECONDS)); + public void cancelGC() { + garbageCollector.cancel(); } public Iterable<SegmentId> getSegmentIds() { @@ -1504,7 +1021,7 @@ public class FileStore implements Segmen return emptyMap(); } - private void checkDiskSpace() { + private void checkDiskSpace(SegmentGCOptions gcOptions) { long repositoryDiskSpace = size(); long availableDiskSpace = directory.getFreeSpace(); boolean updated = gcOptions.isDiskSpaceSufficient(repositoryDiskSpace, availableDiskSpace); @@ -1619,7 +1136,7 @@ public class FileStore implements Segmen } @Override - public Runnable compact() { + public boolean compact() { throw new UnsupportedOperationException("Read Only Store"); } @@ -1640,61 +1157,541 @@ public class FileStore implements Segmen } } + private class GarbageCollector { + @Nonnull + private final SegmentGCOptions gcOptions; - /** - * Represents the cancellation policy for the compaction phase. If the disk - * space was considered insufficient at least once during compaction (or if - * the space was never sufficient to begin with), compaction is considered - * canceled. Furthermore when the file store is shutting down, compaction is - * considered canceled. - */ - private static class CancelCompactionSupplier implements Supplier<Boolean> { + /** + * {@code GcListener} listening to this instance's gc progress + */ + @Nonnull + private final GCListener gcListener; - private static enum REASON { - UNKNOWN, DISK_SPACE, SHUTDOWN, MANUAL - }; + @Nonnull + private final GCJournal gcJournal; + + private volatile boolean cancelled; + + GarbageCollector(@Nonnull SegmentGCOptions gcOptions, + @Nonnull GCListener gcListener, + @Nonnull GCJournal gcJournal) { + this.gcOptions = gcOptions; + this.gcListener = gcListener; + this.gcJournal = gcJournal; + } + + synchronized void run() throws IOException { + gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet()); + Stopwatch watch = Stopwatch.createStarted(); - private REASON reason = REASON.UNKNOWN; + int gainThreshold = gcOptions.getGainThreshold(); + boolean sufficientEstimatedGain = true; + if (gainThreshold <= 0) { + gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)", + GC_COUNT, gainThreshold); + } else if (gcOptions.isPaused()) { + gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT); + } else { + gcListener.info("TarMK GC #{}: estimation started", GC_COUNT); + Supplier<Boolean> cancel = new CancelCompactionSupplier(FileStore.this); + GCEstimation estimate = estimateCompactionGain(cancel); + if (cancel.get()) { + gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel); + return; + } - private final FileStore store; + sufficientEstimatedGain = estimate.gcNeeded(); + String gcLog = estimate.gcLog(); + if (sufficientEstimatedGain) { + gcListener.info( + "TarMK GC #{}: estimation completed in {} ({} ms). {}", + GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog); + } else { + gcListener.skipped( + "TarMK GC #{}: estimation completed in {} ({} ms). {}", + GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog); + } + } - public CancelCompactionSupplier(FileStore store) { - this.store = store; - this.store.gcOptions.setStopCompaction(false); + if (sufficientEstimatedGain) { + if (!gcOptions.isPaused()) { + logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats()); + log(segmentWriter.getNodeCacheOccupancyInfo()); + int gen = compact(); + if (gen > 0) { + fileReaper.add(cleanupOldGenerations(gen)); + } else if (gen < 0) { + gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT); + fileReaper.add(cleanupGeneration(-gen)); + } + logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats()); + log(segmentWriter.getNodeCacheOccupancyInfo()); + } else { + gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT); + } + } } - @Override - public Boolean get() { - // The outOfDiskSpace and shutdown flags can only transition from - // false (their initial - // values), to true. Once true, there should be no way to go back. - if (!store.sufficientDiskSpace.get()) { - reason = REASON.DISK_SPACE; - return true; + /** + * Estimated compaction gain. The result will be undefined if stopped through + * the passed {@code stop} signal. + * @param stop signal for stopping the estimation process. + * @return compaction gain estimate + */ + synchronized GCEstimation estimateCompactionGain(Supplier<Boolean> stop) { + if (gcOptions.isGcSizeDeltaEstimation()) { + SizeDeltaGcEstimation e = new SizeDeltaGcEstimation(gcOptions, + gcJournal, stats.getApproximateSize()); + return e; } - if (store.shutdown) { - reason = REASON.SHUTDOWN; - return true; + + CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), + count(), stop, gcOptions.getGainThreshold()); + fileStoreLock.readLock().lock(); + try { + for (TarReader reader : readers) { + reader.accept(estimate); + if (stop.get()) { + break; + } + } + } finally { + fileStoreLock.readLock().unlock(); } - if (store.gcOptions.isStopCompaction()) { - reason = REASON.MANUAL; - return true; + return estimate; + } + + private void logAndClear( + @Nonnull DescriptiveStatistics nodeWriteTimeStats, + @Nonnull DescriptiveStatistics nodeCompactTimeStats) { + log.info("Node write time statistics (ns) {}", toString(nodeWriteTimeStats)); + log.info("Node compact time statistics (ns) {}", toString(nodeCompactTimeStats)); + nodeWriteTimeStats.clear(); + nodeCompactTimeStats.clear(); + } + + private void log(@CheckForNull String nodeCacheOccupancyInfo) { + if (nodeCacheOccupancyInfo != null) { + log.info("NodeCache occupancy: {}", nodeCacheOccupancyInfo); } - return false; } - @Override - public String toString() { - switch (reason) { - case DISK_SPACE: - return "Not enough disk space available"; - case SHUTDOWN: - return "FileStore shutdown request received"; - case MANUAL: - return "GC stop request received"; - default: - return ""; + private String toString(DescriptiveStatistics statistics) { + DecimalFormat sci = new DecimalFormat("##0.0E0"); + DecimalFormatSymbols symbols = sci.getDecimalFormatSymbols(); + symbols.setNaN("NaN"); + symbols.setInfinity("Inf"); + sci.setDecimalFormatSymbols(symbols); + return "min=" + sci.format(statistics.getMin()) + + ", 10%=" + sci.format(statistics.getPercentile(10.0)) + + ", 50%=" + sci.format(statistics.getPercentile(50.0)) + + ", 90%=" + sci.format(statistics.getPercentile(90.0)) + + ", max=" + sci.format(statistics.getMax()) + + ", mean=" + sci.format(statistics.getMean()) + + ", stdev=" + sci.format(statistics.getStandardDeviation()) + + ", N=" + sci.format(statistics.getN()); + } + + synchronized int compact() throws IOException { + try { + Stopwatch watch = Stopwatch.createStarted(); + gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions); + + SegmentNodeState before = getHead(); + final int newGeneration = getGcGeneration() + 1; + SegmentBufferWriter bufferWriter = new SegmentBufferWriter(FileStore.this, tracker, segmentReader, "c", newGeneration); + Supplier<Boolean> cancel = new CancelCompactionSupplier(FileStore.this); + SegmentNodeState after = compact(bufferWriter, before, cancel); + if (after == null) { + gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel); + return 0; + } + + gcListener.info("TarMK GC #{}: compacted {} to {}", + GC_COUNT, before.getRecordId(), after.getRecordId()); + + int cycles = 0; + boolean success = false; + while (cycles < gcOptions.getRetryCount() && + !(success = revisions.setHead(before.getRecordId(), after.getRecordId(), EXPEDITE_OPTION))) { + // Some other concurrent changes have been made. + // Rebase (and compact) those changes on top of the + // compacted state before retrying to set the head. + cycles++; + gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " + + "Compacting these commits. Cycle {} of {}", + GC_COUNT, cycles, gcOptions.getRetryCount()); + SegmentNodeState head = getHead(); + after = compact(bufferWriter, head, cancel); + if (after == null) { + gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel); + return 0; + } + + gcListener.info("TarMK GC #{}: compacted {} against {} to {}", + GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId()); + before = head; + } + + if (!success) { + gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.", + GC_COUNT, cycles); + int forceTimeout = gcOptions.getForceTimeout(); + if (forceTimeout > 0) { + gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds", + GC_COUNT, forceTimeout); + cycles++; + success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS))); + if (!success) { + if (cancel.get()) { + gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " + + "Compaction was cancelled: {}.", GC_COUNT, cancel); + } else { + gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " + + "Most likely compaction didn't get exclusive access to the store.", GC_COUNT); + } + } + } + } + + if (success) { + gcListener.compacted(SUCCESS, newGeneration); + gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles", + GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles); + return newGeneration; + } else { + gcListener.compacted(FAILURE, newGeneration); + gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles", + GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles); + return -newGeneration; + } + } catch (InterruptedException e) { + gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e); + currentThread().interrupt(); + return 0; + } catch (Exception e) { + gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e); + return 0; + } + } + + /** + * @param duration + * @param unit + * @return {@code Supplier} instance which returns true once the time specified in + * {@code duration} and {@code unit} has passed. + */ + private Supplier<Boolean> timeOut(final long duration, @Nonnull final TimeUnit unit) { + return new Supplier<Boolean>() { + long deadline = currentTimeMillis() + MILLISECONDS.convert(duration, unit); + @Override + public Boolean get() { + return currentTimeMillis() > deadline; + } + }; + } + + /** + * @param supplier1 + * @param supplier2 + * @return {@code Supplier} instance that returns {@code true} iff {@code supplier1} returns + * {@code true} or otherwise {@code supplier2} returns {@code true}. + */ + private Supplier<Boolean> or( + @Nonnull Supplier<Boolean> supplier1, + @Nonnull Supplier<Boolean> supplier2) { + if (supplier1.get()) { + return Suppliers.ofInstance(true); + } else { + return supplier2; + } + } + + private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head, + Supplier<Boolean> cancel) + throws IOException { + if (gcOptions.isOffline()) { + SegmentWriter writer = new SegmentWriter(FileStore.this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer); + return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions) + .compact(EMPTY_NODE, head, EMPTY_NODE); + } else { + return segmentWriter.writeNode(head, bufferWriter, cancel); + } + } + + private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter, + @Nonnull final Supplier<Boolean> cancel) + throws InterruptedException { + return revisions. + setHead(new Function<RecordId, RecordId>() { + @Nullable + @Override + public RecordId apply(RecordId base) { + try { + long t0 = currentTimeMillis(); + SegmentNodeState after = compact(bufferWriter, + segmentReader.readNode(base), cancel); + if (after == null) { + gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds", + GC_COUNT, (currentTimeMillis() - t0) / 1000); + return null; + } else { + return after.getRecordId(); + } + } catch (IOException e) { + gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e); + return null; + } + } + }, + timeout(gcOptions.getForceTimeout(), SECONDS)); + } + + synchronized void cleanup() throws IOException { + fileReaper.add(cleanupOldGenerations(getGcGeneration())); + } + + /** + * Cleanup segments that are from an old generation. That segments whose generation + * is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older. + * @param gcGeneration + * @return list of files to be removed + * @throws IOException + */ + private List<File> cleanupOldGenerations(int gcGeneration) throws IOException { + final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations(); + + Predicate<Integer> reclaimPredicate = new Predicate<Integer>() { + @Override + public boolean apply(Integer generation) { + return generation <= reclaimGeneration; + } + }; + return cleanup(reclaimPredicate, + "gc-count=" + GC_COUNT + + ",gc-status=success" + + ",store-generation=" + gcGeneration + + ",reclaim-predicate=(generation<=" + reclaimGeneration + ")"); + } + + /** + * Cleanup segments whose generation matches the {@code reclaimGeneration} predicate. + * @param reclaimGeneration + * @param gcInfo gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)} + * @return list of files to be removed + * @throws IOException + */ + private List<File> cleanup( + @Nonnull Predicate<Integer> reclaimGeneration, + @Nonnull String gcInfo) + throws IOException { + Stopwatch watch = Stopwatch.createStarted(); + Set<UUID> bulkRefs = newHashSet(); + Map<TarReader, TarReader> cleaned = newLinkedHashMap(); + + long initialSize = 0; + fileStoreLock.writeLock().lock(); + try { + gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT); + + newWriter(); + segmentCache.clear(); + + // Suggest to the JVM that now would be a good time + // to clear stale weak references in the SegmentTracker + System.gc(); + + collectBulkReferences(bulkRefs); + + for (TarReader reader : readers) { + cleaned.put(reader, reader); + initialSize += reader.size(); + } + } finally { + fileStoreLock.writeLock().unlock(); + } + + gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)", + GC_COUNT, humanReadableByteCount(initialSize), initialSize); + + Set<UUID> reclaim = newHashSet(); + for (TarReader reader : cleaned.keySet()) { + reader.mark(bulkRefs, reclaim, reclaimGeneration); + log.info("{}: size of bulk references/reclaim set {}/{}", + reader, bulkRefs.size(), reclaim.size()); + if (shutdown) { + gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); + break; + } + } + Set<UUID> reclaimed = newHashSet(); + for (TarReader reader : cleaned.keySet()) { + cleaned.put(reader, reader.sweep(reclaim, reclaimed)); + if (shutdown) { + gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT); + break; + } + } + + // it doesn't account for concurrent commits that might have happened + long afterCleanupSize = 0; + + List<TarReader> oldReaders = newArrayList(); + fileStoreLock.writeLock().lock(); + try { + // Replace current list of reader with the cleaned readers taking care not to lose + // any new reader that might have come in through concurrent calls to newWriter() + List<TarReader> sweptReaders = newArrayList(); + for (TarReader reader : readers) { + if (cleaned.containsKey(reader)) { + TarReader newReader = cleaned.get(reader); + if (newReader != null) { + sweptReaders.add(newReader); + afterCleanupSize += newReader.size(); + } + // if these two differ, the former represents the swept version of the latter + if (newReader != reader) { + oldReaders.add(reader); + } + } else { + sweptReaders.add(reader); + } + } + readers = sweptReaders; + } finally { + fileStoreLock.writeLock().unlock(); + } + tracker.clearSegmentIdTables(reclaimed, gcInfo); + + // Close old readers *after* setting readers to the new readers to avoid accessing + // a closed reader from readSegment() + LinkedList<File> toRemove = newLinkedList(); + for (TarReader oldReader : oldReaders) { + closeAndLogOnFail(oldReader); + File file = oldReader.getFile(); + gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName()); + toRemove.addLast(file); + } + + long finalSize = size(); + long reclaimedSize = initialSize - afterCleanupSize; + stats.reclaimed(reclaimedSize); + gcJournal.persist(reclaimedSize, finalSize); + gcListener.cleaned(reclaimedSize, finalSize); + gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" + + " and space reclaimed {} ({} bytes).", + GC_COUNT, watch, watch.elapsed(MILLISECONDS), + humanReadableByteCount(finalSize), finalSize, + humanReadableByteCount(reclaimedSize), reclaimedSize); + return toRemove; + } + + private void collectBulkReferences(Set<UUID> bulkRefs) { + Set<UUID> refs = newHashSet(); + for (SegmentId id : tracker.getReferencedSegmentIds()) { + refs.add(id.asUUID()); + } + tarWriter.collectReferences(refs); + for (UUID ref : refs) { + if (!isDataSegmentId(ref.getLeastSignificantBits())) { + bulkRefs.add(ref); + } } } + + /** + * Cleanup segments of the given generation {@code gcGeneration}. + * @param gcGeneration + * @return list of files to be removed + * @throws IOException + */ + private List<File> cleanupGeneration(final int gcGeneration) throws IOException { + Predicate<Integer> cleanupPredicate = new Predicate<Integer>() { + @Override + public boolean apply(Integer generation) { + return generation == gcGeneration; + } + }; + return cleanup(cleanupPredicate, + "gc-count=" + GC_COUNT + + ",gc-status=failed" + + ",store-generation=" + (gcGeneration - 1) + + ",reclaim-predicate=(generation==" + gcGeneration + ")"); + } + + /** + * Finds all external blob references that are currently accessible + * in this repository and adds them to the given collector. Useful + * for collecting garbage in an external data store. + * <p> + * Note that this method only collects blob references that are already + * stored in the repository (at the time when this method is called), so + * the garbage collector will need some other mechanism for tracking + * in-memory references and references stored while this method is + * running. + * @param collector reference collector called back for each blob reference found + */ + synchronized void collectBlobReferences(ReferenceCollector collector) throws IOException { + segmentWriter.flush(); + List<TarReader> tarReaders = newArrayList(); + fileStoreLock.writeLock().lock(); + try { + newWriter(); + tarReaders.addAll(FileStore.this.readers); + } finally { + fileStoreLock.writeLock().unlock(); + } + + int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1; + for (TarReader tarReader : tarReaders) { + tarReader.collectBlobReferences(collector, newReferenceReader(FileStore.this), minGeneration); + } + } + + void cancel() { + cancelled = true; + } + + /** + * Represents the cancellation policy for the compaction phase. If the disk + * space was considered insufficient at least once during compaction (or if + * the space was never sufficient to begin with), compaction is considered + * canceled. Furthermore when the file store is shutting down, compaction is + * considered canceled. + */ + private class CancelCompactionSupplier implements Supplier<Boolean> { + private final FileStore store; + + private String reason; + + public CancelCompactionSupplier(@Nonnull FileStore store) { + cancelled = false; + this.store = store; + } + + @Override + public Boolean get() { + // The outOfDiskSpace and shutdown flags can only transition from + // false (their initial values), to true. Once true, there should + // be no way to go back. + if (!store.sufficientDiskSpace.get()) { + reason = "Not enough disk space"; + return true; + } + if (store.shutdown) { + reason = "The FileStore is shutting down"; + return true; + } + if (cancelled) { + reason = "Cancelled by user"; + return true; + } + return false; + } + + @Override + public String toString() { return reason; } + } } + } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Mon Oct 10 15:10:06 2016 @@ -529,7 +529,7 @@ public class CompactionAndCleanupIT { public Boolean call() throws IOException { boolean cancelled = false; for (int k = 0; !cancelled && k < 1000; k++) { - cancelled = fileStore.compact() == null; + cancelled = !fileStore.compact(); } return cancelled; } @@ -1080,10 +1080,7 @@ public class CompactionAndCleanupIT { Callable<Void> concurrentCleanupTask = new Callable<Void>() { @Override public Void call() throws Exception { - // Concurrent cleanup calls are not supported by the file store - synchronized (fileStore) { - fileStore.cleanup(); - } + fileStore.cleanup(); return null; } }; @@ -1143,10 +1140,7 @@ public class CompactionAndCleanupIT { final Callable<Void> concurrentCleanTask = new Callable<Void>() { @Override public Void call() throws Exception { - // Concurrent cleanup calls are not supported by the file store - synchronized (fileStore) { - fileStore.cleanup(); - } + fileStore.cleanup(); return null; } }; Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java Mon Oct 10 15:10:06 2016 @@ -236,7 +236,7 @@ public class SegmentCompactionIT { List<Registration> registrations = newArrayList(); registrations.add(registerMBean(segmentCompactionMBean, new ObjectName("IT:TYPE=Segment Compaction"))); - registrations.add(registerMBean(new SegmentRevisionGCMBean(gcOptions), + registrations.add(registerMBean(new SegmentRevisionGCMBean(fileStore, gcOptions), new ObjectName("IT:TYPE=Segment Revision GC"))); registrations.add(registerMBean(fileStoreGCMonitor, new ObjectName("IT:TYPE=GC Monitor"))); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java?rev=1764116&r1=1764115&r2=1764116&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java Mon Oct 10 15:10:06 2016 @@ -27,7 +27,6 @@ import java.io.File; import java.io.IOException; import java.util.Random; -import com.google.common.base.Suppliers; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; @@ -80,13 +79,11 @@ public class CompactionEstimatorTest { fileStore.flush(); try { - GCEstimation est = fileStore.estimateCompactionGain(Suppliers - .ofInstance(false)); + GCEstimation est = fileStore.estimateCompactionGain(); assertTrue(est.gcNeeded()); if (est instanceof CompactionGainEstimate) { // should be at 66% - assertTrue(((CompactionGainEstimate) est) - .estimateCompactionGain() > 60); + assertTrue(((CompactionGainEstimate) est).estimateCompactionGain() > 60); } } finally { fileStore.close();