http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java new file mode 100644 index 0000000..acc9747 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.util.*; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier; +import org.apache.cassandra.utils.concurrent.Transactional; + +import static com.google.common.base.Functions.compose; +import static com.google.common.base.Predicates.*; +import static com.google.common.collect.ImmutableSet.copyOf; +import static com.google.common.collect.Iterables.*; +import static java.util.Collections.singleton; +import static org.apache.cassandra.db.lifecycle.Helpers.*; +import static org.apache.cassandra.db.lifecycle.View.updateCompacting; +import static org.apache.cassandra.db.lifecycle.View.updateLiveSet; +import static org.apache.cassandra.utils.Throwables.maybeFail; +import static org.apache.cassandra.utils.concurrent.Refs.release; +import static org.apache.cassandra.utils.concurrent.Refs.selfRefs; + +public class LifecycleTransaction extends Transactional.AbstractTransactional +{ + private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class); + + /** + * a class that represents accumulated modifications to the Tracker. + * has two instances, one containing modifications that are "staged" (i.e. invisible) + * and one containing those "logged" that have been made visible through a call to checkpoint() + */ + private static class State + { + // readers that are either brand new, update a previous new reader, or update one of the original readers + final Set<SSTableReader> update = new HashSet<>(); + // disjoint from update, represents a subset of originals that is no longer needed + final Set<SSTableReader> obsolete = new HashSet<>(); + + void log(State staged) + { + update.removeAll(staged.obsolete); + update.removeAll(staged.update); + update.addAll(staged.update); + obsolete.addAll(staged.obsolete); + } + + boolean contains(SSTableReader reader) + { + return update.contains(reader) || obsolete.contains(reader); + } + + boolean isEmpty() + { + return update.isEmpty() && obsolete.isEmpty(); + } + + void clear() + { + update.clear(); + obsolete.clear(); + } + } + + public final Tracker tracker; + private final OperationType operationType; + // the original readers this transaction was opened over, and that it guards + // (no other transactions may operate over these readers concurrently) + private final Set<SSTableReader> originals = new HashSet<>(); + // the set of readers we've marked as compacting (only updated on creation and in checkpoint()) + private final Set<SSTableReader> marked = new HashSet<>(); + // the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the + // same version of a reader. potentially a dangerous property if there are reference counting bugs + // as they won't be caught until the transaction's lifespan is over. + private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>()); + + // changes that have been made visible + private final State logged = new State(); + // changes that are pending + private final State staged = new State(); + + /** + * construct a Transaction for use in an offline operation + */ + public static LifecycleTransaction offline(OperationType operationType, SSTableReader reader) + { + return offline(operationType, singleton(reader)); + } + + /** + * construct a Transaction for use in an offline operation + */ + public static LifecycleTransaction offline(OperationType operationType, Iterable<SSTableReader> readers) + { + // if offline, for simplicity we just use a dummy tracker + Tracker dummy = new Tracker(null, false); + dummy.addInitialSSTables(readers); + dummy.apply(updateCompacting(emptySet(), readers)); + return new LifecycleTransaction(dummy, operationType, readers); + } + + LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers) + { + this.tracker = tracker; + this.operationType = operationType; + for (SSTableReader reader : readers) + { + originals.add(reader); + marked.add(reader); + identities.add(reader.instanceId); + } + } + + public void doPrepare() + { + // note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit() + // separately: the second prepareToCommit is ignored as a "redundant" transition. since it is only a checkpoint + // (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint, + // it may break this use case, and care is needed + checkpoint(); + } + + /** + * point of no return: commit all changes, but leave all readers marked as compacting + */ + public Throwable doCommit(Throwable accumulate) + { + assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit"; + + logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete); + + // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done + // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size + // and notification status for the obsolete and new files + accumulate = setupDeleteNotification(logged.update, tracker, accumulate); + accumulate = markObsolete(logged.obsolete, accumulate); + accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate); + accumulate = release(selfRefs(logged.obsolete), accumulate); + accumulate = tracker.notifySSTablesChanged(originals, logged.update, operationType, accumulate); + return accumulate; + } + + /** + * undo all of the changes made by this transaction, resetting the state to its original form + */ + public Throwable doAbort(Throwable accumulate) + { + if (logger.isDebugEnabled()) + logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete); + + if (logged.isEmpty() && staged.isEmpty()) + return accumulate; + + // mark obsolete all readers that are not versions of those present in the original set + Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals); + logger.debug("Obsoleting {}", obsolete); + accumulate = markObsolete(obsolete, accumulate); + + // replace all updated readers with a version restored to its original state + accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate); + // setReplaced immediately preceding versions that have not been obsoleted + accumulate = setReplaced(logged.update, accumulate); + // we have replaced all of logged.update and never made visible staged.update, + // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either + // any _staged_ obsoletes should either be in staged.update already, and dealt with there, + // or is still in its original form (so left as is); in either case no extra action is needed + accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate); + logged.clear(); + staged.clear(); + return accumulate; + } + + @Override + protected Throwable doPostCleanup(Throwable accumulate) + { + return unmarkCompacting(marked, accumulate); + } + + public void permitRedundantTransitions() + { + super.permitRedundantTransitions(); + } + + /** + * call when a consistent batch of changes is ready to be made atomically visible + * these will be exposed in the Tracker atomically, or an exception will be thrown; in this case + * the transaction should be rolled back + */ + public void checkpoint() + { + maybeFail(checkpoint(null)); + } + private Throwable checkpoint(Throwable accumulate) + { + if (logger.isDebugEnabled()) + logger.debug("Checkpointing update:{}, obsolete:{}", staged.update, staged.obsolete); + + if (staged.isEmpty()) + return accumulate; + + Set<SSTableReader> toUpdate = toUpdate(); + Set<SSTableReader> fresh = copyOf(fresh()); + + // check the current versions of the readers we're replacing haven't somehow been replaced by someone else + checkNotReplaced(filterIn(toUpdate, staged.update)); + + // ensure any new readers are in the compacting set, since we aren't done with them yet + // and don't want anyone else messing with them + // apply atomically along with updating the live set of readers + tracker.apply(compose(updateCompacting(emptySet(), fresh), + updateLiveSet(toUpdate, staged.update))); + + // log the staged changes and our newly marked readers + marked.addAll(fresh); + logged.log(staged); + + // setup our tracker, and mark our prior versions replaced, also releasing our references to them + // we do not replace/release obsoleted readers, since we may need to restore them on rollback + accumulate = setReplaced(filterOut(toUpdate, staged.obsolete), accumulate); + accumulate = release(selfRefs(filterOut(toUpdate, staged.obsolete)), accumulate); + + staged.clear(); + return accumulate; + } + + /** + * update a reader: if !original, this is a reader that is being introduced by this transaction; + * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction + */ + public void update(SSTableReader reader, boolean original) + { + assert !staged.update.contains(reader) : "each reader may only be updated once per checkpoint: " + reader; + assert !identities.contains(reader.instanceId) : "each reader instance may only be provided as an update once: " + reader; + // check it isn't obsolete, and that it matches the original flag + assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not update a reader that has been obsoleted"; + assert original == originals.contains(reader) : String.format("the 'original' indicator was incorrect (%s provided): %s", original, reader); + staged.update.add(reader); + identities.add(reader.instanceId); + reader.setupKeyCache(); + } + + /** + * mark this reader as for obsoletion. this does not actually obsolete the reader until commit() is called, + * but on checkpoint() the reader will be removed from the live set + */ + public void obsolete(SSTableReader reader) + { + logger.debug("Staging for obsolescence {}", reader); + // check this is: a reader guarded by the transaction, an instance we have already worked with + // and that we haven't already obsoleted it, nor do we have other changes staged for it + assert identities.contains(reader.instanceId) : "only reader instances that have previously been provided may be obsoleted: " + reader; + assert originals.contains(reader) : "only readers in the 'original' set may be obsoleted: " + reader + " vs " + originals; + assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not obsolete a reader that has already been obsoleted: " + reader; + assert !staged.update.contains(reader) : "may not obsolete a reader that has a staged update (must checkpoint first): " + reader; + assert current(reader) == reader : "may only obsolete the latest version of the reader: " + reader; + staged.obsolete.add(reader); + } + + /** + * obsolete every file in the original transaction + */ + public void obsoleteOriginals() + { + logger.debug("Staging for obsolescence {}", originals); + // if we're obsoleting, we should have no staged updates for the original files + assert Iterables.isEmpty(filterIn(staged.update, originals)) : staged.update; + + // stage obsoletes for any currently visible versions of any original readers + Iterables.addAll(staged.obsolete, filterIn(current(), originals)); + } + + /** + * return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged + */ + private Set<SSTableReader> toUpdate() + { + return copyOf(filterIn(current(), staged.obsolete, staged.update)); + } + + /** + * new readers that haven't appeared previously (either in the original set or the logged updates) + */ + private Iterable<SSTableReader> fresh() + { + return filterOut(staged.update, + originals, logged.update); + } + + /** + * returns the currently visible readers managed by this transaction + */ + public Iterable<SSTableReader> current() + { + // i.e., those that are updates that have been logged (made visible), + // and any original readers that have neither been obsoleted nor updated + return concat(logged.update, filterOut(originals, logged.update, logged.obsolete)); + } + + /** + * update the current replacement of any original reader back to its original start + */ + private List<SSTableReader> restoreUpdatedOriginals() + { + Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete); + return ImmutableList.copyOf(transform(torestore, + new Function<SSTableReader, SSTableReader>() + { + public SSTableReader apply(SSTableReader reader) + { + return current(reader).cloneWithNewStart(reader.first, null); + } + })); + } + + /** + * the set of readers guarded by this transaction _in their original instance/state_ + * call current(SSTableReader) on any reader in this set to get the latest instance + */ + public Set<SSTableReader> originals() + { + return Collections.unmodifiableSet(originals); + } + + /** + * indicates if the reader has been marked for obsoletion + */ + public boolean isObsolete(SSTableReader reader) + { + return logged.obsolete.contains(reader) || staged.obsolete.contains(reader); + } + + /** + * return the current version of the provided reader, whether or not it is visible or staged; + * i.e. returns the first version present by testing staged, logged and originals in order. + */ + public SSTableReader current(SSTableReader reader) + { + Set<SSTableReader> container; + if (staged.contains(reader)) + container = staged.update.contains(reader) ? staged.update : staged.obsolete; + else if (logged.contains(reader)) + container = logged.update.contains(reader) ? logged.update : logged.obsolete; + else if (originals.contains(reader)) + container = originals; + else throw new AssertionError(); + return select(reader, container); + } + + /** + * remove the reader from the set we're modifying + */ + public void cancel(SSTableReader cancel) + { + logger.debug("Cancelling {} from transaction", cancel); + assert originals.contains(cancel) : "may only cancel a reader in the 'original' set: " + cancel + " vs " + originals; + assert !(staged.contains(cancel) || logged.contains(cancel)) : "may only cancel a reader that has not been updated or obsoleted in this transaction: " + cancel; + originals.remove(cancel); + marked.remove(cancel); + maybeFail(unmarkCompacting(singleton(cancel), null)); + } + + /** + * remove the readers from the set we're modifying + */ + public void cancel(Iterable<SSTableReader> cancels) + { + for (SSTableReader cancel : cancels) + cancel(cancel); + } + + /** + * remove the provided readers from this Transaction, and return a new Transaction to manage them + * only permitted to be called if the current Transaction has never been used + */ + public LifecycleTransaction split(Collection<SSTableReader> readers) + { + logger.debug("Splitting {} into new transaction", readers); + checkUnused(); + for (SSTableReader reader : readers) + assert identities.contains(reader.instanceId) : "may only split the same reader instance the transaction was opened with: " + reader; + + for (SSTableReader reader : readers) + { + identities.remove(reader.instanceId); + originals.remove(reader); + marked.remove(reader); + } + return new LifecycleTransaction(tracker, operationType, readers); + } + + /** + * check this transaction has never been used + */ + private void checkUnused() + { + assert logged.isEmpty(); + assert staged.isEmpty(); + assert identities.size() == originals.size(); + assert originals.size() == marked.size(); + } + + private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate) + { + accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate); + // when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals + // with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are + // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here. + accumulate = tracker.dropSSTablesIfInvalid(accumulate); + return accumulate; + } + + // convenience method for callers that know only one sstable is involved in the transaction + public SSTableReader onlyOne() + { + assert originals.size() == 1; + return getFirst(originals, null); + } + + // a class representing the current state of the reader within this transaction, encoding the actions both logged + // and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null + // indicating either obsolescence, or that the reader does not occur in the transaction; which is defined + // by the corresponding Action) + @VisibleForTesting + public static class ReaderState + { + public static enum Action + { + UPDATED, OBSOLETED, NONE; + public static Action get(boolean updated, boolean obsoleted) + { + assert !(updated && obsoleted); + return updated ? UPDATED : obsoleted ? OBSOLETED : NONE; + } + } + + final Action staged; + final Action logged; + final SSTableReader nextVisible; + final SSTableReader currentlyVisible; + final boolean original; + + public ReaderState(Action logged, Action staged, SSTableReader currentlyVisible, SSTableReader nextVisible, boolean original) + { + this.staged = staged; + this.logged = logged; + this.currentlyVisible = currentlyVisible; + this.nextVisible = nextVisible; + this.original = original; + } + + public boolean equals(Object that) + { + return that instanceof ReaderState && equals((ReaderState) that); + } + + public boolean equals(ReaderState that) + { + return this.staged == that.staged && this.logged == that.logged && this.original == that.original + && this.currentlyVisible == that.currentlyVisible && this.nextVisible == that.nextVisible; + } + + public String toString() + { + return String.format("[logged=%s staged=%s original=%s]", logged, staged, original); + } + + public static SSTableReader visible(SSTableReader reader, Predicate<SSTableReader> obsolete, Collection<SSTableReader> ... selectFrom) + { + return obsolete.apply(reader) ? null : selectFirst(reader, selectFrom); + } + } + + @VisibleForTesting + public ReaderState state(SSTableReader reader) + { + SSTableReader currentlyVisible = ReaderState.visible(reader, in(logged.obsolete), logged.update, originals); + SSTableReader nextVisible = ReaderState.visible(reader, orIn(staged.obsolete, logged.obsolete), staged.update, logged.update, originals); + return new ReaderState(ReaderState.Action.get(logged.update.contains(reader), logged.obsolete.contains(reader)), + ReaderState.Action.get(staged.update.contains(reader), staged.obsolete.contains(reader)), + currentlyVisible, nextVisible, originals.contains(reader) + ); + } + + public String toString() + { + return originals.toString(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java new file mode 100644 index 0000000..ff2abcb --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java @@ -0,0 +1,40 @@ +package org.apache.cassandra.db.lifecycle; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Interval; +import org.apache.cassandra.utils.IntervalTree; + +public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>> +{ + private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null); + + SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals) + { + super(intervals); + } + + public static SSTableIntervalTree empty() + { + return EMPTY; + } + + public static SSTableIntervalTree build(Iterable<SSTableReader> sstables) + { + return new SSTableIntervalTree(buildIntervals(sstables)); + } + + public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables) + { + List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables)); + for (SSTableReader sstable : sstables) + intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable)); + return intervals; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java new file mode 100644 index 0000000..50f567f --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.*; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.notifications.*; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; + +import static com.google.common.base.Predicates.and; +import static com.google.common.collect.ImmutableSet.copyOf; +import static com.google.common.collect.Iterables.filter; +import static java.util.Collections.singleton; +import static org.apache.cassandra.db.lifecycle.Helpers.*; +import static org.apache.cassandra.db.lifecycle.View.permitCompacting; +import static org.apache.cassandra.db.lifecycle.View.updateCompacting; +import static org.apache.cassandra.db.lifecycle.View.updateLiveSet; +import static org.apache.cassandra.utils.Throwables.maybeFail; +import static org.apache.cassandra.utils.Throwables.merge; +import static org.apache.cassandra.utils.concurrent.Refs.release; +import static org.apache.cassandra.utils.concurrent.Refs.selfRefs; + +public class Tracker +{ + private static final Logger logger = LoggerFactory.getLogger(Tracker.class); + + public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>(); + public final ColumnFamilyStore cfstore; + final AtomicReference<View> view; + public final boolean loadsstables; + + public Tracker(ColumnFamilyStore cfstore, boolean loadsstables) + { + this.cfstore = cfstore; + this.view = new AtomicReference<>(); + this.loadsstables = loadsstables; + this.reset(); + } + + public LifecycleTransaction tryModify(SSTableReader sstable, OperationType operationType) + { + return tryModify(singleton(sstable), operationType); + } + + /** + * @return a Transaction over the provided sstables if we are able to mark the given @param sstables as compacted, before anyone else + */ + public LifecycleTransaction tryModify(Iterable<SSTableReader> sstables, OperationType operationType) + { + if (Iterables.isEmpty(sstables)) + return new LifecycleTransaction(this, operationType, sstables); + if (null == apply(permitCompacting(sstables), updateCompacting(emptySet(), sstables))) + return null; + return new LifecycleTransaction(this, operationType, sstables); + } + + + // METHODS FOR ATOMICALLY MODIFYING THE VIEW + + Pair<View, View> apply(Function<View, View> function) + { + return apply(Predicates.<View>alwaysTrue(), function); + } + + Throwable apply(Function<View, View> function, Throwable accumulate) + { + try + { + apply(function); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + return accumulate; + } + + /** + * atomically tests permit against the view and applies function to it, if permit yields true, returning the original; + * otherwise the method aborts, returning null + */ + Pair<View, View> apply(Predicate<View> permit, Function<View, View> function) + { + while (true) + { + View cur = view.get(); + if (!permit.apply(cur)) + return null; + View updated = function.apply(cur); + if (view.compareAndSet(cur, updated)) + return Pair.create(cur, updated); + } + } + + Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTableReader> newSSTables, Throwable accumulate) + { + if (cfstore == null) + return accumulate; + + long add = 0; + for (SSTableReader sstable : newSSTables) + { + if (logger.isDebugEnabled()) + logger.debug("adding {} to list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name); + try + { + add += sstable.bytesOnDisk(); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + long subtract = 0; + for (SSTableReader sstable : oldSSTables) + { + if (logger.isDebugEnabled()) + logger.debug("removing {} from list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name); + try + { + subtract += sstable.bytesOnDisk(); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + StorageMetrics.load.inc(add - subtract); + cfstore.metric.liveDiskSpaceUsed.inc(add - subtract); + // we don't subtract from total until the sstable is deleted + cfstore.metric.totalDiskSpaceUsed.inc(add); + return accumulate; + } + + // SETUP / CLEANUP + + public void addInitialSSTables(Iterable<SSTableReader> sstables) + { + maybeFail(setupDeleteNotification(sstables, this, null)); + apply(updateLiveSet(emptySet(), sstables)); + maybeFail(updateSizeTracking(emptySet(), sstables, null)); + // no notifications or backup necessary + } + + public void addSSTables(Iterable<SSTableReader> sstables) + { + addInitialSSTables(sstables); + for (SSTableReader sstable : sstables) + { + maybeIncrementallyBackup(sstable); + notifyAdded(sstable); + } + } + + /** (Re)initializes the tracker, purging all references. */ + @VisibleForTesting + public void reset() + { + view.set(new View( + cfstore != null ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(), + ImmutableList.<Memtable>of(), + Collections.<SSTableReader, SSTableReader>emptyMap(), + Collections.<SSTableReader>emptySet(), + SSTableIntervalTree.empty())); + } + + public Throwable dropSSTablesIfInvalid(Throwable accumulate) + { + if (cfstore != null && !cfstore.isValid()) + accumulate = dropSSTables(accumulate); + return accumulate; + } + + public void dropSSTables() + { + maybeFail(dropSSTables(null)); + } + + public Throwable dropSSTables(Throwable accumulate) + { + return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate); + } + + /** + * removes all sstables that are not busy compacting. + */ + public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate) + { + Pair<View, View> result = apply(new Function<View, View>() + { + public View apply(View view) + { + Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting)))); + return updateLiveSet(toremove, emptySet()).apply(view); + } + }); + + Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables); + assert Iterables.all(removed, remove); + + if (!removed.isEmpty()) + { + // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion" + accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), operationType, accumulate); + accumulate = updateSizeTracking(removed, emptySet(), accumulate); + accumulate = markObsolete(removed, accumulate); + accumulate = release(selfRefs(removed), accumulate); + } + return accumulate; + } + + /** + * Removes every SSTable in the directory from the Tracker's view. + * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. + */ + public void removeUnreadableSSTables(final File directory) + { + maybeFail(dropSSTables(new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader reader) + { + return reader.descriptor.directory.equals(directory); + } + }, OperationType.UNKNOWN, null)); + } + + + + // FLUSHING + + /** + * get the Memtable that the ordered writeOp should be directed to + */ + public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition) + { + // since any new memtables appended to the list after we fetch it will be for operations started + // after us, we can safely assume that we will always find the memtable that 'accepts' us; + // if the barrier for any memtable is set whilst we are reading the list, it must accept us. + + // there may be multiple memtables in the list that would 'accept' us, however we only ever choose + // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't + // assign operations to a memtable that was retired/queued before we started) + for (Memtable memtable : view.get().liveMemtables) + { + if (memtable.accepts(opGroup, replayPosition)) + return memtable; + } + throw new AssertionError(view.get().liveMemtables.toString()); + } + + /** + * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables, + * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until + * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable + * must be followed by discarding(m), they cannot be interleaved. + * + * @return the previously active memtable + */ + public Memtable switchMemtable(boolean truncating) + { + Memtable newMemtable = new Memtable(cfstore); + Pair<View, View> result = apply(View.switchMemtable(newMemtable)); + if (truncating) + notifyRenewed(newMemtable); + + return result.left.getCurrentMemtable(); + } + + public void markFlushing(Memtable memtable) + { + apply(View.markFlushing(memtable)); + } + + public void replaceFlushed(Memtable memtable, SSTableReader sstable) + { + if (sstable == null) + { + // sstable may be null if we flushed batchlog and nothing needed to be retained + // if it's null, we don't care what state the cfstore is in, we just replace it and continue + apply(View.replaceFlushed(memtable, null)); + return; + } + + sstable.setupDeleteNotification(this); + sstable.setupKeyCache(); + // back up before creating a new Snapshot (which makes the new one eligible for compaction) + maybeIncrementallyBackup(sstable); + + apply(View.replaceFlushed(memtable, sstable)); + + Throwable fail; + fail = updateSizeTracking(emptySet(), singleton(sstable), null); + // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? + fail = notifyAdded(sstable, fail); + + if (cfstore != null && !cfstore.isValid()) + dropSSTables(); + + maybeFail(fail); + } + + + + // MISCELLANEOUS public utility calls + + public Set<SSTableReader> getSSTables() + { + return view.get().sstables; + } + + public Set<SSTableReader> getCompacting() + { + return view.get().compacting; + } + + public Set<SSTableReader> getUncompacting() + { + return view.get().nonCompactingSStables(); + } + + public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates) + { + return view.get().getUncompacting(candidates); + } + + public void maybeIncrementallyBackup(final SSTableReader sstable) + { + if (!DatabaseDescriptor.isIncrementalBackupsEnabled()) + return; + + File backupsDir = Directories.getBackupsDirectory(sstable.descriptor); + sstable.createLinks(FileUtils.getCanonicalPath(backupsDir)); + } + + public void spaceReclaimed(long size) + { + if (cfstore != null) + cfstore.metric.totalDiskSpaceUsed.dec(size); + } + + + + // NOTIFICATION + + Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate) + { + INotification notification = new SSTableListChangedNotification(added, removed, compactionType); + for (INotificationConsumer subscriber : subscribers) + { + try + { + subscriber.handleNotification(notification, this); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + return accumulate; + } + + Throwable notifyAdded(SSTableReader added, Throwable accumulate) + { + INotification notification = new SSTableAddedNotification(added); + for (INotificationConsumer subscriber : subscribers) + { + try + { + subscriber.handleNotification(notification, this); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + return accumulate; + } + + public void notifyAdded(SSTableReader added) + { + maybeFail(notifyAdded(added, null)); + } + + public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged) + { + INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + } + + public void notifyDeleting(SSTableReader deleting) + { + INotification notification = new SSTableDeletingNotification(deleting); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + } + + public void notifyRenewed(Memtable renewed) + { + INotification notification = new MemtableRenewedNotification(renewed); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + } + + public void notifyTruncated(long truncatedAt) + { + INotification notification = new TruncationNotification(truncatedAt); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + } + + public void subscribe(INotificationConsumer consumer) + { + subscribers.add(consumer); + } + + public void unsubscribe(INotificationConsumer consumer) + { + subscribers.remove(consumer); + } + + private static Set<SSTableReader> emptySet() + { + return Collections.emptySet(); + } + + public View getView() + { + return view.get(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java new file mode 100644 index 0000000..0d1100b --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.util.*; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Predicate; +import com.google.common.collect.*; + +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Interval; + +import static com.google.common.base.Predicates.equalTo; +import static com.google.common.base.Predicates.not; +import static com.google.common.collect.ImmutableList.copyOf; +import static com.google.common.collect.ImmutableList.of; +import static com.google.common.collect.Iterables.all; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.filter; +import static java.util.Collections.singleton; +import static org.apache.cassandra.db.lifecycle.Helpers.emptySet; +import static org.apache.cassandra.db.lifecycle.Helpers.replace; + +/** + * An immutable structure holding the current memtable, the memtables pending + * flush, the sstables for a column family, and the sstables that are active + * in compaction (a subset of the sstables). + * + * Modifications to instances are all performed via a Function produced by the static methods in this class. + * These are composed as necessary and provided to the Tracker.apply() methods, which atomically reject or + * accept and apply the changes to the View. + * + */ +public class View +{ + /** + * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush + * and the new replacement memtable, until all outstanding write operations on the old table complete. + * The last item in the list is always the "current" memtable. + */ + public final List<Memtable> liveMemtables; + /** + * contains all memtables that are no longer referenced for writing and are queued for / in the process of being + * flushed. In chronologically ascending order. + */ + public final List<Memtable> flushingMemtables; + public final Set<SSTableReader> compacting; + public final Set<SSTableReader> sstables; + // we use a Map here so that we can easily perform identity checks as well as equality checks. + // When marking compacting, we now indicate if we expect the sstables to be present (by default we do), + // and we then check that not only are they all present in the live set, but that the exact instance present is + // the one we made our decision to compact against. + public final Map<SSTableReader, SSTableReader> sstablesMap; + + public final SSTableIntervalTree intervalTree; + + View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree) + { + assert liveMemtables != null; + assert flushingMemtables != null; + assert sstables != null; + assert compacting != null; + assert intervalTree != null; + + this.liveMemtables = liveMemtables; + this.flushingMemtables = flushingMemtables; + + this.sstablesMap = sstables; + this.sstables = sstablesMap.keySet(); + this.compacting = compacting; + this.intervalTree = intervalTree; + } + + public Memtable getCurrentMemtable() + { + return liveMemtables.get(liveMemtables.size() - 1); + } + + /** + * @return the active memtable and all the memtables that are pending flush. + */ + public Iterable<Memtable> getAllMemtables() + { + return concat(flushingMemtables, liveMemtables); + } + + public Sets.SetView<SSTableReader> nonCompactingSStables() + { + return Sets.difference(sstables, compacting); + } + + public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates) + { + return filter(candidates, new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return !compacting.contains(sstable); + } + }); + } + + @Override + public String toString() + { + return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting); + } + + public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds) + { + if (intervalTree.isEmpty()) + return Collections.emptyList(); + RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right; + return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree)); + } + + // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW: + + // return a function to un/mark the provided readers compacting in a view + static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark) + { + if (unmark.isEmpty() && Iterables.isEmpty(mark)) + return Functions.identity(); + return new Function<View, View>() + { + public View apply(View view) + { + assert all(mark, Helpers.idIn(view.sstablesMap)); + return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, + replace(view.compacting, unmark, mark), + view.intervalTree); + } + }; + } + + // construct a predicate to reject views that do not permit us to mark these readers compacting; + // i.e. one of them is either already compacting, has been compacted, or has been replaced + static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers) + { + return new Predicate<View>() + { + public boolean apply(View view) + { + for (SSTableReader reader : readers) + if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted()) + return false; + return true; + } + }; + } + + // construct a function to change the liveset in a Snapshot + static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add) + { + if (remove.isEmpty() && Iterables.isEmpty(add)) + return Functions.identity(); + return new Function<View, View>() + { + public View apply(View view) + { + Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add); + return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, + SSTableIntervalTree.build(sstableMap.keySet())); + } + }; + } + + // called prior to initiating flush: add newMemtable to liveMemtables, making it the latest memtable + static Function<View, View> switchMemtable(final Memtable newMemtable) + { + return new Function<View, View>() + { + public View apply(View view) + { + List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build(); + assert newLive.size() == view.liveMemtables.size() + 1; + return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.intervalTree); + } + }; + } + + // called before flush: move toFlush from liveMemtables to flushingMemtables + static Function<View, View> markFlushing(final Memtable toFlush) + { + return new Function<View, View>() + { + public View apply(View view) + { + List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables; + List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush)))); + List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)), + of(toFlush), + filter(flushing, not(lessThan(toFlush))))); + assert newLive.size() == live.size() - 1; + assert newFlushing.size() == flushing.size() + 1; + return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.intervalTree); + } + }; + } + + // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set + static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed) + { + return new Function<View, View>() + { + public View apply(View view) + { + List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable)))); + assert flushingMemtables.size() == view.flushingMemtables.size() - 1; + + if (flushed == null) + return new View(view.liveMemtables, flushingMemtables, view.sstablesMap, + view.compacting, view.intervalTree); + + Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed)); + return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compacting, + SSTableIntervalTree.build(sstableMap.keySet())); + } + }; + } + + private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan) + { + return new Predicate<T>() + { + public boolean apply(T t) + { + return t.compareTo(lessThan) < 0; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index a6c7a8b..c994a3d 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -403,7 +403,7 @@ public class CompressionMetadata count = chunkIndex; } - protected Throwable doCleanup(Throwable failed) + protected Throwable doPreCleanup(Throwable failed) { return offsets.close(failed); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 81e487c..9bfbc99 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -19,24 +19,15 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; +import com.google.common.collect.*; + import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +35,10 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; @@ -209,22 +202,25 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean * Returns a Pair of all compacting and non-compacting sstables. Non-compacting sstables will be marked as * compacting. */ - private Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> getCompactingAndNonCompactingSSTables() + private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> getCompactingAndNonCompactingSSTables() { List<SSTableReader> allCompacting = new ArrayList<>(); - Multimap<DataTracker, SSTableReader> allNonCompacting = HashMultimap.create(); + Map<UUID, LifecycleTransaction> allNonCompacting = new HashMap<>(); for (Keyspace ks : Keyspace.all()) { for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores()) { Set<SSTableReader> nonCompacting, allSSTables; + LifecycleTransaction txn = null; do { - allSSTables = cfStore.getDataTracker().getSSTables(); - nonCompacting = Sets.newHashSet(cfStore.getDataTracker().getUncompactingSSTables(allSSTables)); + View view = cfStore.getTracker().getView(); + allSSTables = view.sstables; + nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables)); } - while (!(nonCompacting.isEmpty() || cfStore.getDataTracker().markCompacting(nonCompacting))); - allNonCompacting.putAll(cfStore.getDataTracker(), nonCompacting); + while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN))); + + allNonCompacting.put(cfStore.metadata.cfId, txn); allCompacting.addAll(Sets.difference(allSSTables, nonCompacting)); } } @@ -233,50 +229,57 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean public void redistributeSummaries() throws IOException { - Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); + Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables(); try { - redistributeSummaries(compactingAndNonCompacting.left, Lists.newArrayList(compactingAndNonCompacting.right.values()), this.memoryPoolBytes); + redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes); } finally { - for(DataTracker tracker : compactingAndNonCompacting.right.keySet()) - tracker.unmarkCompacting(compactingAndNonCompacting.right.get(tracker)); + for (LifecycleTransaction modifier : compactingAndNonCompacting.right.values()) + modifier.close(); } } /** * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on * their recent read rates. - * @param nonCompacting a list of sstables to share the memory pool across + * @param transactions containing the sstables we are to redistribute the memory pool across * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or * under, if possible * @return a list of new SSTableReader instances */ @VisibleForTesting - public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException + public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException { - long total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting)) - total += sstable.getIndexSummaryOffHeapSize(); - List<SSTableReader> oldFormatSSTables = new ArrayList<>(); - for (SSTableReader sstable : nonCompacting) + List<SSTableReader> redistribute = new ArrayList<>(); + for (LifecycleTransaction txn : transactions.values()) { - // We can't change the sampling level of sstables with the old format, because the serialization format - // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) - logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); - if (!sstable.descriptor.version.hasSamplingLevel()) - oldFormatSSTables.add(sstable); + for (SSTableReader sstable : ImmutableList.copyOf(txn.originals())) + { + // We can't change the sampling level of sstables with the old format, because the serialization format + // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) + logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); + if (!sstable.descriptor.version.hasSamplingLevel()) + { + oldFormatSSTables.add(sstable); + txn.cancel(sstable); + } + } + redistribute.addAll(txn.originals()); } - nonCompacting.removeAll(oldFormatSSTables); + + long total = 0; + for (SSTableReader sstable : Iterables.concat(compacting, redistribute)) + total += sstable.getIndexSummaryOffHeapSize(); logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", - nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); + redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); - final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size()); + final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size()); double totalReadsPerSec = 0.0; - for (SSTableReader sstable : nonCompacting) + for (SSTableReader sstable : redistribute) { if (sstable.getReadMeter() != null) { @@ -288,7 +291,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec); // copy and sort by read rates (ascending) - List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting); + List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute); Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); long remainingBytes = memoryPoolBytes; @@ -297,7 +300,10 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean logger.trace("Index summaries for compacting SSTables are using {} MB of space", (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); - List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes); + List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes); + + for (LifecycleTransaction txn : transactions.values()) + txn.finish(); total = 0; for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) @@ -308,7 +314,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean return newSSTables; } - private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, + private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, LifecycleTransaction> transactions, double totalReadsPerSec, long memoryPoolCapacity) throws IOException { @@ -410,26 +416,16 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean toDownsample.addAll(forceResample); toDownsample.addAll(toUpsample); toDownsample.addAll(forceUpsample); - Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create(); - Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create(); for (ResampleEntry entry : toDownsample) { SSTableReader sstable = entry.sstable; logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); - ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()); + ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId); SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); - DataTracker tracker = cfs.getDataTracker(); - - replacedByTracker.put(tracker, sstable); - replacementsByTracker.put(tracker, replacement); - } - - for (DataTracker tracker : replacedByTracker.keySet()) - { - tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker)); - newSSTables.addAll(replacementsByTracker.get(tracker)); + newSSTables.add(replacement); + transactions.get(sstable.metadata.cfId).update(replacement, true); } return newSSTables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java index db54557..cc837ba 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java @@ -23,15 +23,16 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.db.DataTracker; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Blocker; public class SSTableDeletingTask implements Runnable { @@ -42,11 +43,12 @@ public class SSTableDeletingTask implements Runnable // Additionally, we need to make sure to delete the data file first, so on restart the others // will be recognized as GCable. private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<>(); + private static final Blocker blocker = new Blocker(); private final SSTableReader referent; private final Descriptor desc; private final Set<Component> components; - private DataTracker tracker; + private Tracker tracker; /** * realDescriptor is the actual descriptor for the sstable, the descriptor inside @@ -70,13 +72,18 @@ public class SSTableDeletingTask implements Runnable } } - public void setTracker(DataTracker tracker) + public void setTracker(Tracker tracker) { // the tracker is used only to notify listeners of deletion of the sstable; // since deletion of a non-final file is not really deletion of the sstable, // we don't want to notify the listeners in this event - if (desc.type == Descriptor.Type.FINAL) - this.tracker = tracker; + assert desc.type == Descriptor.Type.FINAL; + this.tracker = tracker; + } + + public Tracker getTracker() + { + return tracker; } public void schedule() @@ -86,6 +93,7 @@ public class SSTableDeletingTask implements Runnable public void run() { + blocker.ask(); long size = referent.bytesOnDisk(); if (tracker != null) @@ -119,6 +127,7 @@ public class SSTableDeletingTask implements Runnable } /** for tests */ + @VisibleForTesting public static void waitForDeletions() { Runnable runnable = new Runnable() @@ -130,5 +139,11 @@ public class SSTableDeletingTask implements Runnable FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS)); } + + @VisibleForTesting + public static void pauseDeletions(boolean stop) + { + blocker.block(stop); + } }