Author: frm Date: Mon Apr 23 15:15:46 2018 New Revision: 1829894 URL: http://svn.apache.org/viewvc?rev=1829894&view=rev Log: OAK-7434 - Extract compaction implementations in separate components
Introduce CompactionStrategy, which represents a possible way to carry on the compaction phase in the scope of the bigger garbage collection process. The full and tail compaction phases have been extracted to FullCompactionStrategy and TailCompactionStrategy respectively. Moreover, in order to have full compaction act as a fallback for tail compaction when a base state can't be found, CompactionResult has been extended with a new result type and FallbackCompactionStrategy has been introduced. Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java (with props) jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionStrategy.java (with props) jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FallbackCompactionStrategy.java (with props) jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FullCompactionStrategy.java (with props) jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SegmentWriterFactory.java (with props) jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SuccessfulCompactionListener.java (with props) jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TailCompactionStrategy.java (with props) Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import static java.lang.Thread.currentThread; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION; +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_FORCE_COMPACT; +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_RETRY; +import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION; +import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout; + +import java.io.IOException; + +import com.google.common.base.Function; +import org.apache.jackrabbit.oak.segment.CheckpointCompactor; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentWriter; +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +abstract class AbstractCompactionStrategy implements CompactionStrategy { + + abstract GCType getCompactionType(); + + abstract GCGeneration nextGeneration(GCGeneration current); + + private CompactionResult compactionSucceeded( + Context context, + GCGeneration generation, + RecordId compactedRootId + ) { + context.getGCListener().compactionSucceeded(generation); + return CompactionResult.succeeded(getCompactionType(), generation, context.getGCOptions(), compactedRootId, context.getGCCount()); + } + + private static GCGeneration getGcGeneration(Context context) { + return context.getRevisions().getHead().getSegmentId().getGcGeneration(); + } + + private static SegmentNodeState getHead(Context context) { + return context.getSegmentReader().readHeadState(context.getRevisions()); + } + + private static long size(Context context) { + return context.getTarFiles().size(); + } + + private static CompactionResult compactionAborted(Context context, GCGeneration generation) { + context.getGCListener().compactionFailed(generation); + return CompactionResult.aborted(getGcGeneration(context), generation, context.getGCCount()); + } + + private static SegmentNodeState forceCompact( + Context context, + NodeState base, + NodeState onto, + CheckpointCompactor compactor + ) throws InterruptedException { + RecordId compactedId = setHead(context, headId -> { + try { + PrintableStopwatch t = PrintableStopwatch.createStarted(); + SegmentNodeState after = compactor.compact(base, context.getSegmentReader().readNode(headId), onto); + if (after != null) { + return after.getRecordId(); + } + context.getGCListener().info("compaction cancelled after {}", t); + return null; + } catch (IOException e) { + context.getGCListener().error("error during forced compaction.", e); + return null; + } + }); + if (compactedId == null) { + return null; + } + return context.getSegmentReader().readNode(compactedId); + } + + private static RecordId setHead(Context context, Function<RecordId, RecordId> f) throws InterruptedException { + return context.getRevisions().setHead(f, timeout(context.getGCOptions().getForceTimeout(), SECONDS)); + } + + private static String formatCompactionType(GCType compactionType) { + switch (compactionType) { + case FULL: + return "full"; + case TAIL: + return "tail"; + default: + throw new IllegalStateException("unsupported compaction type: " + compactionType); + } + } + + final CompactionResult compact(Context context, NodeState base) { + context.getGCListener().info("running {} compaction", formatCompactionType(getCompactionType())); + + GCGeneration nextGeneration = nextGeneration(getGcGeneration(context)); + + try { + PrintableStopwatch watch = PrintableStopwatch.createStarted(); + context.getGCListener().info( + "compaction started, gc options={}, current generation={}, new generation={}", + context.getGCOptions(), + getHead(context).getRecordId().getSegment().getGcGeneration(), + nextGeneration + ); + context.getGCListener().updateStatus(COMPACTION.message()); + + GCJournal.GCJournalEntry gcEntry = context.getGCJournal().read(); + long initialSize = size(context); + + SegmentWriter writer = context.getSegmentWriterFactory().newSegmentWriter(nextGeneration); + + context.getCompactionMonitor().init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize); + + CheckpointCompactor compactor = new CheckpointCompactor( + context.getGCListener(), + context.getSegmentReader(), + writer, + context.getBlobStore(), + context.getCanceller(), + context.getCompactionMonitor() + ); + + SegmentNodeState head = getHead(context); + SegmentNodeState compacted = compactor.compact(base, head, base); + if (compacted == null) { + context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller()); + return compactionAborted(context, nextGeneration); + } + + context.getGCListener().info("compaction cycle 0 completed in {}. Compacted {} to {}", + watch, head.getRecordId(), compacted.getRecordId()); + + int cycles = 0; + boolean success = false; + SegmentNodeState previousHead = head; + while (cycles < context.getGCOptions().getRetryCount() && + !(success = context.getRevisions().setHead(previousHead.getRecordId(), compacted.getRecordId(), EXPEDITE_OPTION))) { + // Some other concurrent changes have been made. + // Rebase (and compact) those changes on top of the + // compacted state before retrying to set the head. + cycles++; + context.getGCListener().info("compaction detected concurrent commits while compacting. " + + "Compacting these commits. Cycle {} of {}", + cycles, context.getGCOptions().getRetryCount()); + context.getGCListener().updateStatus(COMPACTION_RETRY.message() + cycles); + PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted(); + + head = getHead(context); + compacted = compactor.compact(previousHead, head, compacted); + if (compacted == null) { + context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller()); + return compactionAborted(context, nextGeneration); + } + + context.getGCListener().info("compaction cycle {} completed in {}. Compacted {} against {} to {}", + cycles, cycleWatch, head.getRecordId(), previousHead.getRecordId(), compacted.getRecordId()); + previousHead = head; + } + + if (!success) { + context.getGCListener().info("compaction gave up compacting concurrent commits after {} cycles.", cycles); + int forceTimeout = context.getGCOptions().getForceTimeout(); + if (forceTimeout > 0) { + context.getGCListener().info("trying to force compact remaining commits for {} seconds. " + + "Concurrent commits to the store will be blocked.", + forceTimeout); + context.getGCListener().updateStatus(COMPACTION_FORCE_COMPACT.message()); + PrintableStopwatch forceWatch = PrintableStopwatch.createStarted(); + + cycles++; + context.getCanceller().timeOutAfter(forceTimeout, SECONDS); + compacted = forceCompact(context, previousHead, compacted, compactor); + success = compacted != null; + if (success) { + context.getGCListener().info("compaction succeeded to force compact remaining commits after {}.", forceWatch); + } else { + if (context.getCanceller().get()) { + context.getGCListener().warn("compaction failed to force compact remaining commits " + + "after {}. Compaction was cancelled: {}.", + forceWatch, context.getCanceller()); + } else { + context.getGCListener().warn("compaction failed to force compact remaining commits. " + + "after {}. Could not acquire exclusive access to the node store.", + forceWatch); + } + } + } + } + + if (success) { + // Update type of the last compaction before calling methods that could throw an exception. + context.getSuccessfulCompactionListener().onSuccessfulCompaction(getCompactionType()); + writer.flush(); + context.getFlusher().flush(); + context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles); + return compactionSucceeded(context, nextGeneration, compacted.getRecordId()); + } else { + context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles); + return compactionAborted(context, nextGeneration); + } + } catch (InterruptedException e) { + context.getGCListener().error("compaction interrupted", e); + currentThread().interrupt(); + return compactionAborted(context, nextGeneration); + } catch (IOException e) { + context.getGCListener().error("compaction encountered an error", e); + return compactionAborted(context, nextGeneration); + } + } + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java?rev=1829894&r1=1829893&r2=1829894&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java Mon Apr 23 15:15:46 2018 @@ -138,9 +138,29 @@ abstract class CompactionResult { }; } + static CompactionResult notApplicable(int count) { + return new CompactionResult(GCGeneration.NULL, count) { + + @Override + Predicate<GCGeneration> reclaimer() { + return generation -> false; + } + + @Override + boolean isSuccess() { + return false; + } + + @Override + boolean isNotApplicable() { + return true; + } + + }; + } + /** - * @return a predicate determining which segments to {@link - * FileStore.GarbageCollector#cleanup(CompactionResult) clean up} for the + * @return a predicate determining which segments to {clean up} for the * given compaction result. */ abstract Predicate<GCGeneration> reclaimer(); @@ -158,6 +178,10 @@ abstract class CompactionResult { return RecordId.NULL; } + boolean isNotApplicable() { + return false; + } + /** * @return a diagnostic message describing the outcome of this compaction. */ Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionStrategy.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionStrategy.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionStrategy.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import org.apache.jackrabbit.oak.segment.Revisions; +import org.apache.jackrabbit.oak.segment.SegmentReader; +import org.apache.jackrabbit.oak.segment.SegmentTracker; +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; + +interface CompactionStrategy { + + interface Context { + + GCListener getGCListener(); + + GCJournal getGCJournal(); + + SegmentGCOptions getGCOptions(); + + GCNodeWriteMonitor getCompactionMonitor(); + + SegmentReader getSegmentReader(); + + SegmentWriterFactory getSegmentWriterFactory(); + + Revisions getRevisions(); + + TarFiles getTarFiles(); + + BlobStore getBlobStore(); + + CancelCompactionSupplier getCanceller(); + + int getGCCount(); + + SuccessfulCompactionListener getSuccessfulCompactionListener(); + + Flusher getFlusher(); + + SegmentTracker getSegmentTracker(); + + } + + CompactionResult compact(Context context); + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java?rev=1829894&r1=1829893&r2=1829894&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java Mon Apr 23 15:15:46 2018 @@ -20,21 +20,11 @@ package org.apache.jackrabbit.oak.segment.file; import static com.google.common.collect.Sets.newHashSet; -import static java.lang.Thread.currentThread; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId; -import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.FULL; -import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.TAIL; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.CLEANUP; -import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION; -import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_FORCE_COMPACT; -import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_RETRY; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.ESTIMATION; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.IDLE; import static org.apache.jackrabbit.oak.segment.file.PrintableBytes.newPrintableBytes; -import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION; -import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout; import java.io.IOException; import java.util.Collection; @@ -44,45 +34,26 @@ import java.util.UUID; import javax.annotation.Nonnull; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; -import org.apache.jackrabbit.oak.segment.CheckpointCompactor; -import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentNodeState; -import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; -import org.apache.jackrabbit.oak.segment.SegmentWriter; +import org.apache.jackrabbit.oak.segment.SegmentReader; +import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; -import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; class DefaultGarbageCollectionStrategy implements GarbageCollectionStrategy { - private GCGeneration getGcGeneration(Context context) { - return context.getRevisions().getHead().getSegmentId().getGcGeneration(); - } + private final CompactionStrategy fullCompactionStrategy = new FullCompactionStrategy(); - private SegmentNodeState getBase(Context context) { - String root = context.getGCJournal().read().getRoot(); - RecordId rootId = RecordId.fromString(context.getSegmentTracker(), root); - if (RecordId.NULL.equals(rootId)) { - return null; - } - try { - SegmentNodeState node = context.getSegmentReader().readNode(rootId); - node.getPropertyCount(); // Resilience: fail early with a SNFE if the segment is not there - return node; - } catch (SegmentNotFoundException snfe) { - context.getGCListener().error("base state " + rootId + " is not accessible", snfe); - return null; - } - } + private final CompactionStrategy tailCompactionStrategy = new FallbackCompactionStrategy(new TailCompactionStrategy(), fullCompactionStrategy); - private SegmentNodeState getHead(Context context) { - return context.getSegmentReader().readHeadState(context.getRevisions()); + private GCGeneration getGcGeneration(Context context) { + return context.getRevisions().getHead().getSegmentId().getGcGeneration(); } @Override @@ -169,181 +140,90 @@ class DefaultGarbageCollectionStrategy i } } - @Override - public synchronized CompactionResult compactFull(Context context) { - context.getGCListener().info("running full compaction"); - return compact(context, FULL, EMPTY_NODE, getGcGeneration(context).nextFull()); - } + private static CompactionStrategy.Context newCompactionStrategyContext(Context context) { + return new CompactionStrategy.Context() { - @Override - public synchronized CompactionResult compactTail(Context context) { - context.getGCListener().info("running tail compaction"); - SegmentNodeState base = getBase(context); - if (base != null) { - return compact(context, TAIL, base, getGcGeneration(context).nextTail()); - } - context.getGCListener().info("no base state available, running full compaction instead"); - return compact(context, FULL, EMPTY_NODE, getGcGeneration(context).nextFull()); - } + @Override + public SegmentTracker getSegmentTracker() { + return context.getSegmentTracker(); + } - private CompactionResult compact(Context context, SegmentGCOptions.GCType gcType, NodeState base, GCGeneration newGeneration) { - try { - PrintableStopwatch watch = PrintableStopwatch.createStarted(); - context.getGCListener().info( - "compaction started, gc options={}, current generation={}, new generation={}", - context.getGCOptions(), - getHead(context).getRecordId().getSegment().getGcGeneration(), - newGeneration - ); - context.getGCListener().updateStatus(COMPACTION.message()); - - GCJournal.GCJournalEntry gcEntry = context.getGCJournal().read(); - long initialSize = size(context); - - SegmentWriter writer = context.getSegmentWriterFactory().newSegmentWriter(newGeneration); - - context.getCompactionMonitor().init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize); - - CheckpointCompactor compactor = new CheckpointCompactor( - context.getGCListener(), - context.getSegmentReader(), - writer, - context.getBlobStore(), - context.getCanceller(), - context.getCompactionMonitor() - ); - - SegmentNodeState head = getHead(context); - SegmentNodeState compacted = compactor.compact(base, head, base); - if (compacted == null) { - context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller()); - return compactionAborted(context, newGeneration); - } - - context.getGCListener().info("compaction cycle 0 completed in {}. Compacted {} to {}", - watch, head.getRecordId(), compacted.getRecordId()); - - int cycles = 0; - boolean success = false; - SegmentNodeState previousHead = head; - while (cycles < context.getGCOptions().getRetryCount() && - !(success = context.getRevisions().setHead(previousHead.getRecordId(), compacted.getRecordId(), EXPEDITE_OPTION))) { - // Some other concurrent changes have been made. - // Rebase (and compact) those changes on top of the - // compacted state before retrying to set the head. - cycles++; - context.getGCListener().info("compaction detected concurrent commits while compacting. " + - "Compacting these commits. Cycle {} of {}", - cycles, context.getGCOptions().getRetryCount()); - context.getGCListener().updateStatus(COMPACTION_RETRY.message() + cycles); - PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted(); - - head = getHead(context); - compacted = compactor.compact(previousHead, head, compacted); - if (compacted == null) { - context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller()); - return compactionAborted(context, newGeneration); - } + @Override + public GCListener getGCListener() { + return context.getGCListener(); + } - context.getGCListener().info("compaction cycle {} completed in {}. Compacted {} against {} to {}", - cycles, cycleWatch, head.getRecordId(), previousHead.getRecordId(), compacted.getRecordId()); - previousHead = head; - } - - if (!success) { - context.getGCListener().info("compaction gave up compacting concurrent commits after {} cycles.", cycles); - int forceTimeout = context.getGCOptions().getForceTimeout(); - if (forceTimeout > 0) { - context.getGCListener().info("trying to force compact remaining commits for {} seconds. " + - "Concurrent commits to the store will be blocked.", - forceTimeout); - context.getGCListener().updateStatus(COMPACTION_FORCE_COMPACT.message()); - PrintableStopwatch forceWatch = PrintableStopwatch.createStarted(); - - cycles++; - context.getCanceller().timeOutAfter(forceTimeout, SECONDS); - compacted = forceCompact(context, previousHead, compacted, compactor); - success = compacted != null; - if (success) { - context.getGCListener().info("compaction succeeded to force compact remaining commits after {}.", forceWatch); - } else { - if (context.getCanceller().get()) { - context.getGCListener().warn("compaction failed to force compact remaining commits " + - "after {}. Compaction was cancelled: {}.", - forceWatch, context.getCanceller()); - } else { - context.getGCListener().warn("compaction failed to force compact remaining commits. " + - "after {}. Could not acquire exclusive access to the node store.", - forceWatch); - } - } - } + @Override + public GCJournal getGCJournal() { + return context.getGCJournal(); } - if (success) { - // Update type of the last compaction before calling methods that could throw an exception. - context.getSuccessfulCompactionListener().onSuccessfulCompaction(gcType); - writer.flush(); - context.getFlusher().flush(); - context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles); - return compactionSucceeded(context, gcType, newGeneration, compacted.getRecordId()); - } else { - context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles); - return compactionAborted(context, newGeneration); + @Override + public SegmentGCOptions getGCOptions() { + return context.getGCOptions(); } - } catch (InterruptedException e) { - context.getGCListener().error("compaction interrupted", e); - currentThread().interrupt(); - return compactionAborted(context, newGeneration); - } catch (IOException e) { - context.getGCListener().error("compaction encountered an error", e); - return compactionAborted(context, newGeneration); - } - } - private CompactionResult compactionAborted(Context context, GCGeneration generation) { - context.getGCListener().compactionFailed(generation); - return CompactionResult.aborted(getGcGeneration(context), generation, context.getGCCount()); - } + @Override + public GCNodeWriteMonitor getCompactionMonitor() { + return context.getCompactionMonitor(); + } + + @Override + public SegmentReader getSegmentReader() { + return context.getSegmentReader(); + } + + @Override + public SegmentWriterFactory getSegmentWriterFactory() { + return context.getSegmentWriterFactory(); + } - private CompactionResult compactionSucceeded( - Context context, - SegmentGCOptions.GCType gcType, - GCGeneration generation, - RecordId compactedRootId - ) { - context.getGCListener().compactionSucceeded(generation); - return CompactionResult.succeeded(gcType, generation, context.getGCOptions(), compactedRootId, context.getGCCount()); + @Override + public Revisions getRevisions() { + return context.getRevisions(); + } + + @Override + public TarFiles getTarFiles() { + return context.getTarFiles(); + } + + @Override + public BlobStore getBlobStore() { + return context.getBlobStore(); + } + + @Override + public CancelCompactionSupplier getCanceller() { + return context.getCanceller(); + } + + @Override + public int getGCCount() { + return context.getGCCount(); + } + + @Override + public SuccessfulCompactionListener getSuccessfulCompactionListener() { + return context.getSuccessfulCompactionListener(); + } + + @Override + public Flusher getFlusher() { + return context.getFlusher(); + } + + }; } - private SegmentNodeState forceCompact( - Context context, - final NodeState base, - final NodeState onto, - final CheckpointCompactor compactor - ) throws InterruptedException { - RecordId compactedId = setHead(context, headId -> { - try { - PrintableStopwatch t = PrintableStopwatch.createStarted(); - SegmentNodeState after = compactor.compact(base, context.getSegmentReader().readNode(headId), onto); - if (after != null) { - return after.getRecordId(); - } - context.getGCListener().info("compaction cancelled after {}", t); - return null; - } catch (IOException e) { - context.getGCListener().error("error during forced compaction.", e); - return null; - } - }); - if (compactedId == null) { - return null; - } - return context.getSegmentReader().readNode(compactedId); + @Override + public synchronized CompactionResult compactFull(Context context) { + return fullCompactionStrategy.compact(newCompactionStrategyContext(context)); } - private RecordId setHead(Context context, Function<RecordId, RecordId> f) throws InterruptedException { - return context.getRevisions().setHead(f, timeout(context.getGCOptions().getForceTimeout(), SECONDS)); + @Override + public synchronized CompactionResult compactTail(Context context) { + return tailCompactionStrategy.compact(newCompactionStrategyContext(context)); } private GCEstimationResult estimateCompactionGain(Context context, boolean full) { Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FallbackCompactionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FallbackCompactionStrategy.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FallbackCompactionStrategy.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FallbackCompactionStrategy.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +class FallbackCompactionStrategy implements CompactionStrategy { + + private final CompactionStrategy primary; + + private final CompactionStrategy fallback; + + FallbackCompactionStrategy(CompactionStrategy primary, CompactionStrategy fallback) { + this.primary = primary; + this.fallback = fallback; + } + + @Override + public CompactionResult compact(Context context) { + CompactionResult result = primary.compact(context); + + if (result.isNotApplicable()) { + return fallback.compact(context); + } + + return result; + } + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FallbackCompactionStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FullCompactionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FullCompactionStrategy.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FullCompactionStrategy.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FullCompactionStrategy.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.FULL; + +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; + +class FullCompactionStrategy extends AbstractCompactionStrategy { + + @Override + GCType getCompactionType() { + return FULL; + } + + @Override + GCGeneration nextGeneration(GCGeneration current) { + return current.nextFull(); + } + + @Override + public CompactionResult compact(Context context) { + return compact(context, EMPTY_NODE); + } + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FullCompactionStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java?rev=1829894&r1=1829893&r2=1829894&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java Mon Apr 23 15:15:46 2018 @@ -27,9 +27,7 @@ import org.apache.jackrabbit.oak.segment import org.apache.jackrabbit.oak.segment.SegmentCache; import org.apache.jackrabbit.oak.segment.SegmentReader; import org.apache.jackrabbit.oak.segment.SegmentTracker; -import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; -import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; import org.apache.jackrabbit.oak.spi.blob.BlobStore; @@ -41,18 +39,6 @@ interface GarbageCollectionStrategy { } - interface SuccessfulCompactionListener { - - void onSuccessfulCompaction(SegmentGCOptions.GCType type); - - } - - interface SegmentWriterFactory { - - SegmentWriter newSegmentWriter(GCGeneration generation); - - } - interface Context { SegmentGCOptions getGCOptions(); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java?rev=1829894&r1=1829893&r2=1829894&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java Mon Apr 23 15:15:46 2018 @@ -38,7 +38,6 @@ import org.apache.jackrabbit.oak.segment import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; -import org.apache.jackrabbit.oak.segment.file.GarbageCollectionStrategy.SuccessfulCompactionListener; import org.apache.jackrabbit.oak.segment.file.GarbageCollectionStrategy.SuccessfulGarbageCollectionListener; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; @@ -95,7 +94,7 @@ class GarbageCollector { private final Flusher flusher; - private final GarbageCollectionStrategy.SegmentWriterFactory segmentWriterFactory; + private final SegmentWriterFactory segmentWriterFactory; private final GCNodeWriteMonitor compactionMonitor; @@ -129,7 +128,7 @@ class GarbageCollector { FileStoreStats stats, CancelCompactionSupplier cancel, Flusher flusher, - GarbageCollectionStrategy.SegmentWriterFactory segmentWriterFactory + SegmentWriterFactory segmentWriterFactory ) { this.gcOptions = gcOptions; this.gcListener = new PrefixedGCListener(gcListener, GC_COUNT); @@ -187,7 +186,7 @@ class GarbageCollector { } @Override - public GarbageCollectionStrategy.SegmentWriterFactory getSegmentWriterFactory() { + public SegmentWriterFactory getSegmentWriterFactory() { return segmentWriterFactory; } Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SegmentWriterFactory.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SegmentWriterFactory.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SegmentWriterFactory.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SegmentWriterFactory.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import org.apache.jackrabbit.oak.segment.SegmentWriter; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; + +interface SegmentWriterFactory { + + SegmentWriter newSegmentWriter(GCGeneration generation); + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SegmentWriterFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SuccessfulCompactionListener.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SuccessfulCompactionListener.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SuccessfulCompactionListener.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SuccessfulCompactionListener.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; + +interface SuccessfulCompactionListener { + + void onSuccessfulCompaction(SegmentGCOptions.GCType type); + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SuccessfulCompactionListener.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TailCompactionStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TailCompactionStrategy.java?rev=1829894&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TailCompactionStrategy.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TailCompactionStrategy.java Mon Apr 23 15:15:46 2018 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.TAIL; + +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +class TailCompactionStrategy extends AbstractCompactionStrategy { + + @Override + GCType getCompactionType() { + return TAIL; + } + + @Override + GCGeneration nextGeneration(GCGeneration current) { + return current.nextTail(); + } + + @Override + public CompactionResult compact(Context context) { + NodeState base = getBase(context); + + if (base == null) { + context.getGCListener().info("no base state available, tail compaction is not applicable"); + return CompactionResult.notApplicable(context.getGCCount()); + } + + return compact(context, base); + } + + private static NodeState getBase(Context context) { + RecordId id = getLastCompactedRootId(context); + + if (RecordId.NULL.equals(id)) { + return null; + } + + // Nodes are read lazily. In order to force a read operation for the requested + // node, the property count is computed. Computing the property count requires + // access to the node template, whose ID is stored in the content of the node. + // Accessing the content of the node forces a read operation for the segment + // containing the node. If the following code completes without throwing a + // SNFE, we can be sure that *at least* the root node can be accessed. This + // doesn't say anything about the health of the full closure of the head + // state. + + try { + NodeState node = getLastCompactedRootNode(context); + node.getPropertyCount(); + return node; + } catch (SegmentNotFoundException e) { + context.getGCListener().error("base state " + id + " is not accessible", e); + return null; + } + } + + private static String getLastCompactedRoot(Context context) { + return context.getGCJournal().read().getRoot(); + } + + private static RecordId getLastCompactedRootId(Context context) { + return RecordId.fromString(context.getSegmentTracker(), getLastCompactedRoot(context)); + } + + private static NodeState getLastCompactedRootNode(Context context) { + return context.getSegmentReader().readNode(getLastCompactedRootId(context)); + } + +} Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TailCompactionStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native