Handle non-atomic directory streams safely (CASSANDRA-10109) This patch refactors the lifecycle transaction log and updates the logic to be robust to non-atomic listings of directories
patch by stefania; reviewed by benedict for CASSANDRA-10109 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351c7cac Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351c7cac Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351c7cac Branch: refs/heads/cassandra-3.0 Commit: 351c7caca311834f6c5bff08b0204943850214a9 Parents: 3818d30 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Thu Aug 27 14:09:45 2015 +0800 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Tue Sep 8 11:53:22 2015 +0100 ---------------------------------------------------------------------- .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../org/apache/cassandra/db/Directories.java | 5 +- .../apache/cassandra/db/lifecycle/Helpers.java | 12 +- .../db/lifecycle/LifecycleTransaction.java | 64 +- .../db/lifecycle/LogAwareFileLister.java | 196 +++ .../apache/cassandra/db/lifecycle/LogFile.java | 364 ++++++ .../cassandra/db/lifecycle/LogRecord.java | 208 ++++ .../cassandra/db/lifecycle/LogTransaction.java | 418 +++++++ .../apache/cassandra/db/lifecycle/Tracker.java | 4 +- .../cassandra/db/lifecycle/TransactionLog.java | 1141 ------------------ .../apache/cassandra/io/sstable/SSTable.java | 1 - .../io/sstable/format/SSTableReader.java | 5 +- .../org/apache/cassandra/io/util/FileUtils.java | 10 +- .../apache/cassandra/service/GCInspector.java | 4 +- .../cassandra/service/StorageService.java | 4 +- .../cassandra/tools/StandaloneScrubber.java | 3 +- .../cassandra/tools/StandaloneSplitter.java | 3 +- .../cassandra/tools/StandaloneUpgrader.java | 3 +- .../org/apache/cassandra/utils/CLibrary.java | 2 +- .../org/apache/cassandra/db/KeyCacheTest.java | 5 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 3 +- .../cassandra/db/lifecycle/HelpersTest.java | 4 +- .../db/lifecycle/LifecycleTransactionTest.java | 2 +- .../db/lifecycle/LogTransactionTest.java | 823 +++++++++++++ .../db/lifecycle/RealTransactionsTest.java | 7 +- .../cassandra/db/lifecycle/TrackerTest.java | 7 +- .../db/lifecycle/TransactionLogTest.java | 812 ------------- .../io/sstable/SSTableRewriterTest.java | 35 +- .../org/apache/cassandra/schema/DefsTest.java | 4 +- 29 files changed, 2109 insertions(+), 2042 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 096172d..979e8ba 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -510,7 +510,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SystemKeyspace.removeTruncationRecord(metadata.cfId); data.dropSSTables(); - TransactionLog.waitForDeletions(); + LifecycleTransaction.waitForDeletions(); indexManager.invalidateAllIndexesBlocking(); materializedViewManager.invalidate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index c17b1fd..c801952 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -32,6 +32,7 @@ import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; @@ -550,7 +551,7 @@ public class Directories TEMPORARY, /** A transaction log file (contains information on final and temporary files). */ - TXN_LOG + TXN_LOG; } /** @@ -562,7 +563,7 @@ public class Directories /** Throw the exception */ THROW, - /** Ignore the txn log file */ + /** Ignore the problematic parts of the txn log file */ IGNORE } http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/Helpers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java index 98983c5..f9555f4 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java @@ -106,12 +106,12 @@ class Helpers assert !reader.isReplaced(); } - static Throwable markObsolete(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate) + static Throwable markObsolete(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) { if (obsoletions == null || obsoletions.isEmpty()) return accumulate; - for (TransactionLog.Obsoletion obsoletion : obsoletions) + for (LogTransaction.Obsoletion obsoletion : obsoletions) { try { @@ -125,13 +125,13 @@ class Helpers return accumulate; } - static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLog txnLogs, List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate) + static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) { for (SSTableReader reader : readers) { try { - obsoletions.add(new TransactionLog.Obsoletion(reader, txnLogs.obsoleted(reader))); + obsoletions.add(new LogTransaction.Obsoletion(reader, txnLogs.obsoleted(reader))); } catch (Throwable t) { @@ -141,12 +141,12 @@ class Helpers return accumulate; } - static Throwable abortObsoletion(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate) + static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) { if (obsoletions == null || obsoletions.isEmpty()) return accumulate; - for (TransactionLog.Obsoletion obsoletion : obsoletions) + for (LogTransaction.Obsoletion obsoletion : obsoletions) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 520b229..59bbc7d 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -98,7 +98,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional public final Tracker tracker; // The transaction logs keep track of new and old sstable files - private final TransactionLog transactionLog; + private final LogTransaction log; // the original readers this transaction was opened over, and that it guards // (no other transactions may operate over these readers concurrently) private final Set<SSTableReader> originals = new HashSet<>(); @@ -115,7 +115,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional private final State staged = new State(); // the tidier and their readers, to be used for marking readers obsoleted during a commit - private List<TransactionLog.Obsoletion> obsoletions; + private List<LogTransaction.Obsoletion> obsoletions; /** * construct a Transaction for use in an offline operation @@ -143,7 +143,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata) { Tracker dummy = new Tracker(null, false); - return new LifecycleTransaction(dummy, new TransactionLog(operationType, metadata, dummy), Collections.emptyList()); + return new LifecycleTransaction(dummy, new LogTransaction(operationType, metadata, dummy), Collections.emptyList()); } /** @@ -152,18 +152,18 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional public static LifecycleTransaction offline(OperationType operationType, File operationFolder) { Tracker dummy = new Tracker(null, false); - return new LifecycleTransaction(dummy, new TransactionLog(operationType, operationFolder, dummy), Collections.emptyList()); + return new LifecycleTransaction(dummy, new LogTransaction(operationType, operationFolder, dummy), Collections.emptyList()); } LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers) { - this(tracker, new TransactionLog(operationType, getMetadata(tracker, readers), tracker), readers); + this(tracker, new LogTransaction(operationType, getMetadata(tracker, readers), tracker), readers); } - LifecycleTransaction(Tracker tracker, TransactionLog transactionLog, Iterable<SSTableReader> readers) + LifecycleTransaction(Tracker tracker, LogTransaction log, Iterable<SSTableReader> readers) { this.tracker = tracker; - this.transactionLog = transactionLog; + this.log = log; for (SSTableReader reader : readers) { originals.add(reader); @@ -187,19 +187,19 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional return null; } - public TransactionLog log() + public LogTransaction log() { - return transactionLog; + return log; } public OperationType opType() { - return transactionLog.getType(); + return log.getType(); } public UUID opId() { - return transactionLog.getId(); + return log.getId(); } public void doPrepare() @@ -212,8 +212,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional // prepare for compaction obsolete readers as long as they were part of the original set // since those that are not original are early readers that share the same desc with the finals - maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLog, obsoletions = new ArrayList<>(), null)); - transactionLog.prepareToCommit(); + maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), log, obsoletions = new ArrayList<>(), null)); + log.prepareToCommit(); } /** @@ -228,7 +228,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional maybeFail(accumulate); // transaction log commit failure means we must abort; safe commit is not possible - maybeFail(transactionLog.commit(null)); + maybeFail(log.commit(null)); // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size @@ -237,7 +237,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional accumulate = markObsolete(obsoletions, accumulate); accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate); accumulate = release(selfRefs(logged.obsolete), accumulate); - accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLog.getType(), accumulate); + accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.getType(), accumulate); return accumulate; } @@ -253,16 +253,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional accumulate = abortObsoletion(obsoletions, accumulate); if (logged.isEmpty() && staged.isEmpty()) - return transactionLog.abort(accumulate); + return log.abort(accumulate); // mark obsolete all readers that are not versions of those present in the original set Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals); logger.debug("Obsoleting {}", obsolete); - accumulate = prepareForObsoletion(obsolete, transactionLog, obsoletions = new ArrayList<>(), accumulate); + accumulate = prepareForObsoletion(obsolete, log, obsoletions = new ArrayList<>(), accumulate); // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report // a failure to abort, which is useful information to have for debug - accumulate = transactionLog.abort(accumulate); + accumulate = log.abort(accumulate); accumulate = markObsolete(obsoletions, accumulate); // replace all updated readers with a version restored to its original state @@ -502,7 +502,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional originals.remove(reader); marked.remove(reader); } - return new LifecycleTransaction(tracker, transactionLog.getType(), readers); + return new LifecycleTransaction(tracker, log.getType(), readers); } /** @@ -535,17 +535,17 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional public void trackNew(SSTable table) { - transactionLog.trackNew(table); + log.trackNew(table); } public void untrackNew(SSTable table) { - transactionLog.untrackNew(table); + log.untrackNew(table); } public static void removeUnfinishedLeftovers(CFMetaData metadata) { - TransactionLog.removeUnfinishedLeftovers(metadata); + LogTransaction.removeUnfinishedLeftovers(metadata); } /** @@ -562,7 +562,25 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional */ public static List<File> getFiles(Path folder, BiFunction<File, Directories.FileType, Boolean> filter, Directories.OnTxnErr onTxnErr) { - return new TransactionLog.FileLister(folder, filter, onTxnErr).list(); + return new LogAwareFileLister(folder, filter, onTxnErr).list(); + } + + /** + * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.) + * Useful because there are times when we know GC has been invoked; also exposed as an mbean. + */ + public static void rescheduleFailedDeletions() + { + LogTransaction.rescheduleFailedDeletions(); + } + + /** + * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader) + * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed. + */ + public static void waitForDeletions() + { + LogTransaction.waitForDeletions(); } // a class representing the current state of the reader within this transaction, encoding the actions both logged http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java new file mode 100644 index 0000000..e086078 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java @@ -0,0 +1,196 @@ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.Directories; + +import static org.apache.cassandra.db.Directories.*; + +/** + * A class for listing files in a folder. + */ +final class LogAwareFileLister +{ + // The folder to scan + private final Path folder; + + // The filter determines which files the client wants returned + private final BiFunction<File, FileType, Boolean> filter; //file, file type + + // The behavior when we fail to list files + private final OnTxnErr onTxnErr; + + // The unfiltered result + NavigableMap<File, Directories.FileType> files = new TreeMap<>(); + + @VisibleForTesting + LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr) + { + this.folder = folder; + this.filter = filter; + this.onTxnErr = onTxnErr; + } + + public List<File> list() + { + try + { + return innerList(); + } + catch (Throwable t) + { + throw new RuntimeException(String.format("Failed to list files in %s", folder), t); + } + } + + List<File> innerList() throws Throwable + { + list(Files.newDirectoryStream(folder)) + .stream() + .filter((f) -> !LogFile.isLogFile(f)) + .forEach((f) -> files.put(f, FileType.FINAL)); + + // Since many file systems are not atomic, we cannot be sure we have listed a consistent disk state + // (Linux would permit this, but for simplicity we keep our behaviour the same across platforms) + // so we must be careful to list txn log files AFTER every other file since these files are deleted last, + // after all other files are removed + list(Files.newDirectoryStream(folder, '*' + LogFile.EXT)) + .stream() + .filter(LogFile::isLogFile) + .forEach(this::classifyFiles); + + // Finally we apply the user filter before returning our result + return files.entrySet().stream() + .filter((e) -> filter.apply(e.getKey(), e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + static List<File> list(DirectoryStream<Path> stream) throws IOException + { + try + { + return StreamSupport.stream(stream.spliterator(), false) + .map(Path::toFile) + .filter((f) -> !f.isDirectory()) + .collect(Collectors.toList()); + } + finally + { + stream.close(); + } + } + + /** + * We read txn log files, if we fail we throw only if the user has specified + * OnTxnErr.THROW, else we log an error and apply the txn log anyway + */ + void classifyFiles(File txnFile) + { + LogFile txn = LogFile.make(txnFile, -1); + readTxnLog(txn); + classifyFiles(txn); + files.put(txnFile, FileType.TXN_LOG); + } + + void readTxnLog(LogFile txn) + { + txn.readRecords(); + if (!txn.verify() && onTxnErr == OnTxnErr.THROW) + throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn); + } + + void classifyFiles(LogFile txnFile) + { + Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.REMOVE); + Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.ADD); + + if (txnFile.completed()) + { // last record present, filter regardless of disk status + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + if (allFilesPresent(txnFile, oldFiles, newFiles)) + { // all files present, transaction is in progress, this will filter as aborted + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + // some files are missing, we expect the txn file to either also be missing or completed, so check + // disk state again to resolve any previous races on non-atomic directory listing platforms + + // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any) + if (!txnFile.exists()) + return; + + // otherwise read the file again to see if it is completed now + readTxnLog(txnFile); + + if (txnFile.completed()) + { // if after re-reading the txn is completed then filter accordingly + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + // some files are missing and yet the txn is still there and not completed + // something must be wrong (see comment at the top of this file requiring txn to be + // completed before obsoleting or aborting sstables) + throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s", + folder, + txnFile)); + } + + /** See if all files are present or if only the last record files are missing and it's a NEW record */ + private static boolean allFilesPresent(LogFile txnFile, Map<LogRecord, Set<File>> oldFiles, Map<LogRecord, Set<File>> newFiles) + { + LogRecord lastRecord = txnFile.getLastRecord(); + return !Stream.concat(oldFiles.entrySet().stream(), + newFiles.entrySet().stream() + .filter((e) -> e.getKey() != lastRecord)) + .filter((e) -> e.getKey().numFiles > e.getValue().size()) + .findFirst().isPresent(); + } + + private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles) + { + Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : newFiles; + temporary.stream() + .flatMap(Set::stream) + .forEach((f) -> this.files.put(f, FileType.TEMPORARY)); + } + + @VisibleForTesting + static Set<File> getTemporaryFiles(File folder) + { + return listFiles(folder, FileType.TEMPORARY); + } + + @VisibleForTesting + static Set<File> getFinalFiles(File folder) + { + return listFiles(folder, FileType.FINAL); + } + + @VisibleForTesting + static Set<File> listFiles(File folder, FileType ... types) + { + Collection<FileType> match = Arrays.asList(types); + return new LogAwareFileLister(folder.toPath(), + (file, type) -> match.contains(type), + OnTxnErr.IGNORE).list() + .stream() + .collect(Collectors.toSet()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java new file mode 100644 index 0000000..c698722 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -0,0 +1,364 @@ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.nio.file.Files; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.collect.Iterables; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LogRecord.Type; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.CLibrary; + +import static org.apache.cassandra.utils.Throwables.merge; + +/** + * The transaction log file, which contains many records. + */ +final class LogFile +{ + private static final Logger logger = LoggerFactory.getLogger(LogFile.class); + + static String EXT = ".log"; + static char SEP = '_'; + // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion) + static Pattern FILE_REGEX = Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT)); + + final File file; + final Set<LogRecord> records = new LinkedHashSet<>(); + final OperationType opType; + final UUID id; + final File folder; + final int folderDescriptor; + + static LogFile make(File logFile, int folderDescriptor) + { + Matcher matcher = LogFile.FILE_REGEX.matcher(logFile.getName()); + assert matcher.matches() && matcher.groupCount() == 3; + + // For now we don't need this but it is there in case we need to change + // file format later on, the version is the sstable version as defined in BigFormat + //String version = matcher.group(1); + + OperationType operationType = OperationType.fromFileName(matcher.group(2)); + UUID id = UUID.fromString(matcher.group(3)); + + return new LogFile(operationType, logFile.getParentFile(), folderDescriptor, id); + } + + void sync() + { + if (folderDescriptor > 0) + CLibrary.trySync(folderDescriptor); + } + + OperationType getType() + { + return opType; + } + + UUID getId() + { + return id; + } + + Throwable removeUnfinishedLeftovers(Throwable accumulate) + { + try + { + deleteRecords(committed() ? Type.REMOVE : Type.ADD); + + // we sync the parent file descriptor between contents and log deletion + // to ensure there is a happens before edge between them + sync(); + + Files.delete(file.toPath()); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + + return accumulate; + } + + static boolean isLogFile(File file) + { + return LogFile.FILE_REGEX.matcher(file.getName()).matches(); + } + + LogFile(OperationType opType, File folder, int folderDescriptor, UUID id) + { + this.opType = opType; + this.id = id; + this.folder = folder; + this.file = new File(getFileName(folder, opType, id)); + this.folderDescriptor = folderDescriptor; + } + + public void readRecords() + { + assert records.isEmpty(); + FileUtils.readLines(file).stream() + .map(LogRecord::make) + .forEach(records::add); + } + + public boolean verify() + { + Optional<LogRecord> firstInvalid = records.stream() + .filter(this::isInvalid) + .findFirst(); + + if (!firstInvalid.isPresent()) + return true; + + LogRecord failedOn = firstInvalid.get(); + if (getLastRecord() != failedOn) + { + logError(failedOn); + return false; + } + + if (records.stream() + .filter((r) -> r != failedOn) + .filter(LogFile::isInvalidWithCorruptedLastRecord) + .map(LogFile::logError) + .findFirst().isPresent()) + { + logError(failedOn); + return false; + } + + // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord, + // then we simply exited whilst serializing the last record and we carry on + logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], but all previous records match state on disk; continuing", + id, + failedOn.error)); + return true; + } + + static LogRecord logError(LogRecord record) + { + logger.error("{}", record.error); + return record; + } + + boolean isInvalid(LogRecord record) + { + if (!record.isValid()) + return true; + + if (record.type == Type.UNKNOWN) + { + record.error(String.format("Could not parse record [%s]", record)); + return true; + } + + if (record.checksum != record.computeChecksum()) + { + record.error(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]", + record.relativeFilePath, + record, + record.checksum, + record.computeChecksum())); + return true; + } + + if (record.type != Type.REMOVE) + return false; + + List<File> files = record.getExistingFiles(folder); + + // Paranoid sanity checks: we create another record by looking at the files as they are + // on disk right now and make sure the information still matches + record.onDiskRecord = LogRecord.make(record.type, files, 0, record.relativeFilePath); + + if (record.updateTime != record.onDiskRecord.updateTime && record.onDiskRecord.numFiles > 0) + { + record.error(String.format("Unexpected files detected for sstable [%s], record [%s]: last update time [%tT] should have been [%tT]", + record.relativeFilePath, + record, + record.onDiskRecord.updateTime, + record.updateTime)); + return true; + } + + return false; + } + + static boolean isInvalidWithCorruptedLastRecord(LogRecord record) + { + if (record.type == Type.REMOVE && record.onDiskRecord.numFiles < record.numFiles) + { // if we found a corruption in the last record, then we continue only if the number of files matches exactly for all previous records. + record.error(String.format("Incomplete fileset detected for sstable [%s], record [%s]: number of files [%d] should have been [%d]. Treating as unrecoverable due to corruption of the final record.", + record.relativeFilePath, + record.raw, + record.onDiskRecord.numFiles, + record.numFiles)); + return true; + } + return false; + } + + public void commit() + { + assert !completed() : "Already completed!"; + addRecord(LogRecord.makeCommit(System.currentTimeMillis())); + } + + public void abort() + { + assert !completed() : "Already completed!"; + addRecord(LogRecord.makeAbort(System.currentTimeMillis())); + } + + private boolean isLastRecordValidWithType(Type type) + { + LogRecord lastRecord = getLastRecord(); + return lastRecord != null && + lastRecord.type == type && + !isInvalid(lastRecord); + } + + public boolean committed() + { + return isLastRecordValidWithType(Type.COMMIT); + } + + public boolean aborted() + { + return isLastRecordValidWithType(Type.ABORT); + } + + public boolean completed() + { + return committed() || aborted(); + } + + public void add(Type type, SSTable table) + { + if (!addRecord(makeRecord(type, table))) + throw new IllegalStateException(); + } + + private LogRecord makeRecord(Type type, SSTable table) + { + assert type == Type.ADD || type == Type.REMOVE; + return LogRecord.make(type, folder, table); + } + + private boolean addRecord(LogRecord record) + { + if (!records.add(record)) + return false; + + // we only checksum the records, not the checksums themselves + FileUtils.append(file, record.toString()); + sync(); + return true; + } + + public void remove(Type type, SSTable table) + { + LogRecord record = makeRecord(type, table); + + assert records.contains(record) : String.format("[%s] is not tracked by %s", record, file); + + records.remove(record); + deleteRecord(record); + } + + public boolean contains(Type type, SSTable table) + { + return records.contains(makeRecord(type, table)); + } + + public void deleteRecords(Type type) + { + assert file.exists() : String.format("Expected %s to exists", file); + records.stream() + .filter(type::matches) + .forEach(this::deleteRecord); + records.clear(); + } + + private void deleteRecord(LogRecord record) + { + List<File> files = record.getExistingFiles(folder); + + // we sort the files in ascending update time order so that the last update time + // stays the same even if we only partially delete files + files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified())); + + files.forEach(LogTransaction::delete); + } + + public Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, Type type) + { + Map<LogRecord, Set<File>> ret = new HashMap<>(); + + records.stream() + .filter(type::matches) + .filter(LogRecord::isValid) + .forEach((r) -> ret.put(r, getRecordFiles(files, r))); + + return ret; + } + + public LogRecord getLastRecord() + { + return Iterables.getLast(records, null); + } + + private Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record) + { + Set<File> ret = new HashSet<>(); + for (File file : files.tailSet(new File(folder, record.relativeFilePath))) + { + if (!file.getName().startsWith(record.relativeFilePath)) + break; + ret.add(file); + } + return ret; + } + + public void delete() + { + LogTransaction.delete(file); + } + + public boolean exists() + { + return file.exists(); + } + + @Override + public String toString() + { + return FileUtils.getRelativePath(folder.getPath(), file.getPath()); + } + + static String getFileName(File folder, OperationType opType, UUID id) + { + String fileName = StringUtils.join(BigFormat.latestVersion, + LogFile.SEP, + "txn", + LogFile.SEP, + opType.fileName, + LogFile.SEP, + id.toString(), + LogFile.EXT); + return StringUtils.join(folder, File.separator, fileName); + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java new file mode 100644 index 0000000..0f0f3a2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@ -0,0 +1,208 @@ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.CRC32; + +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A log file record, each record is encoded in one line and has different + * content depending on the record type. + */ +final class LogRecord +{ + public enum Type + { + UNKNOWN, // a record that cannot be parsed + ADD, // new files to be retained on commit + REMOVE, // old files to be retained on abort + COMMIT, // commit flag + ABORT; // abort flag + + public static Type fromPrefix(String prefix) + { + return valueOf(prefix.toUpperCase()); + } + + public boolean hasFile() + { + return this == Type.ADD || this == Type.REMOVE; + } + + public boolean matches(LogRecord record) + { + return this == record.type; + } + } + + + public final Type type; + public final String relativeFilePath; + public final long updateTime; + public final int numFiles; + public final String raw; + public final long checksum; + + public String error; + public LogRecord onDiskRecord; + + // (add|remove|commit|abort):[*,*,*][checksum] + static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE); + + public static LogRecord make(String line) + { + try + { + Matcher matcher = REGEX.matcher(line); + if (!matcher.matches()) + return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line) + .error(String.format("Failed to parse [%s]", line)); + + Type type = Type.fromPrefix(matcher.group(1)); + return new LogRecord(type, matcher.group(2), Long.valueOf(matcher.group(3)), Integer.valueOf(matcher.group(4)), Long.valueOf(matcher.group(5)), line); + } + catch (Throwable t) + { + return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line).error(t); + } + } + + public static LogRecord makeCommit(long updateTime) + { + return new LogRecord(Type.COMMIT, "", updateTime, 0); + } + + public static LogRecord makeAbort(long updateTime) + { + return new LogRecord(Type.ABORT, "", updateTime, 0); + } + + public static LogRecord make(Type type, File parentFolder, SSTable table) + { + String relativePath = FileUtils.getRelativePath(parentFolder.getPath(), table.descriptor.baseFilename()); + // why do we take the max of files.size() and table.getAllFilePaths().size()? + return make(type, getExistingFiles(parentFolder, relativePath), table.getAllFilePaths().size(), relativePath); + } + + public static LogRecord make(Type type, List<File> files, int minFiles, String relativeFilePath) + { + long lastModified = files.stream().map(File::lastModified).reduce(0L, Long::max); + return new LogRecord(type, relativeFilePath, lastModified, Math.max(minFiles, files.size())); + } + + private LogRecord(Type type, + String relativeFilePath, + long updateTime, + int numFiles) + { + this(type, relativeFilePath, updateTime, numFiles, 0, null); + } + + private LogRecord(Type type, + String relativeFilePath, + long updateTime, + int numFiles, + long checksum, + String raw) + { + this.type = type; + this.relativeFilePath = type.hasFile() ? relativeFilePath : ""; // only meaningful for file records + this.updateTime = type == Type.REMOVE ? updateTime : 0; // only meaningful for old records + this.numFiles = type.hasFile() ? numFiles : 0; // only meaningful for file records + if (raw == null) + { + assert checksum == 0; + this.checksum = computeChecksum(); + this.raw = format(); + } + else + { + this.checksum = checksum; + this.raw = raw; + } + + this.error = ""; + } + + public LogRecord error(Throwable t) + { + return error(t.getMessage()); + } + + public LogRecord error(String error) + { + this.error = error; + return this; + } + + public boolean isValid() + { + return this.error.isEmpty(); + } + + private String format() + { + return String.format("%s:[%s,%d,%d][%d]", type.toString(), relativeFilePath, updateTime, numFiles, checksum); + } + + public List<File> getExistingFiles(File folder) + { + if (!type.hasFile()) + return Collections.emptyList(); + + return getExistingFiles(folder, relativeFilePath); + } + + public static List<File> getExistingFiles(File parentFolder, String relativeFilePath) + { + return Arrays.asList(parentFolder.listFiles((dir, name) -> name.startsWith(relativeFilePath))); + } + + @Override + public int hashCode() + { + // see comment in equals + return Objects.hash(type, relativeFilePath, error); + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof LogRecord)) + return false; + + final LogRecord other = (LogRecord)obj; + + // we exclude on purpose checksum, update time and count as + // we don't want duplicated records that differ only by + // properties that might change on disk, especially COMMIT records, + // there should be only one regardless of update time + // however we must compare the error to make sure we have more than + // one UNKNOWN record, if we fail to parse more than one + return type == other.type && + relativeFilePath.equals(other.relativeFilePath) && + error.equals(other.error); + } + + @Override + public String toString() + { + return raw; + } + + long computeChecksum() + { + CRC32 crc32 = new CRC32(); + crc32.update(relativeFilePath.getBytes(FileUtils.CHARSET)); + crc32.update(type.toString().getBytes(FileUtils.CHARSET)); + FBUtilities.updateChecksumInt(crc32, (int) updateTime); + FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32)); + FBUtilities.updateChecksumInt(crc32, numFiles); + return crc32.getValue() & (Long.MAX_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java new file mode 100644 index 0000000..89d7beb --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Runnables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LogRecord.Type; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SnapshotDeletingTask; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.RefCounted; +import org.apache.cassandra.utils.concurrent.Transactional; + +/** + * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction, + * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent + * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also + * *requires* that the prepareToCommit() phase only take actions that can be rolled back. + * + * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the + * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure + * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used + * outside of LT. @see FileLister.classifyFiles(TransactionData txn) + * + * A class that tracks sstable files involved in a transaction across sstables: + * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails. + * + * The transaction log file contains new and old sstables as follows: + * + * add:[sstable-2][CRC] + * remove:[sstable-1,max_update_time,num files][CRC] + * + * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be + * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the + * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times + * and file sizes. + * + * Upon commit we add a final line to the log file: + * + * commit:[commit_time][CRC] + * + * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been + * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction + * was committed, vice-versa if the transaction was aborted. + * + * On start-up we look for any transaction log files and repeat the cleanup process described above. + * + * See CASSANDRA-7066 for full details. + */ +class LogTransaction extends Transactional.AbstractTransactional implements Transactional +{ + private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class); + + /** + * If the format of the lines in the transaction log is wrong or the checksum + * does not match, then we throw this exception. + */ + public static final class CorruptTransactionLogException extends RuntimeException + { + public final LogFile file; + + public CorruptTransactionLogException(String message, LogFile file) + { + super(message); + this.file = file; + } + } + + private final Tracker tracker; + private final LogFile data; + private final Ref<LogTransaction> selfRef; + // Deleting sstables is tricky because the mmapping might not have been finalized yet, + // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs). + // Additionally, we need to make sure to delete the data file first, so on restart the others + // will be recognized as GCable. + private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>(); + + LogTransaction(OperationType opType, CFMetaData metadata) + { + this(opType, metadata, null); + } + + LogTransaction(OperationType opType, CFMetaData metadata, Tracker tracker) + { + this(opType, new Directories(metadata), tracker); + } + + LogTransaction(OperationType opType, Directories directories, Tracker tracker) + { + this(opType, directories.getDirectoryForNewSSTables(), tracker); + } + + LogTransaction(OperationType opType, File folder, Tracker tracker) + { + this.tracker = tracker; + int folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath()); + this.data = new LogFile(opType, folder, folderDescriptor, UUIDGen.getTimeUUID()); + this.selfRef = new Ref<>(this, new TransactionTidier(data, folderDescriptor)); + + if (logger.isDebugEnabled()) + logger.debug("Created transaction logs with id {}", data.id); + } + + /** + * Track a reader as new. + **/ + void trackNew(SSTable table) + { + data.add(Type.ADD, table); + } + + /** + * Stop tracking a reader as new. + */ + void untrackNew(SSTable table) + { + data.remove(Type.ADD, table); + } + + /** + * Schedule a reader for deletion as soon as it is fully unreferenced. + */ + SSTableTidier obsoleted(SSTableReader reader) + { + if (data.contains(Type.ADD, reader)) + { + if (data.contains(Type.REMOVE, reader)) + throw new IllegalArgumentException(); + + return new SSTableTidier(reader, true, this); + } + + data.add(Type.REMOVE, reader); + + if (tracker != null) + tracker.notifyDeleting(reader); + + return new SSTableTidier(reader, false, this); + } + + OperationType getType() + { + return data.getType(); + } + + UUID getId() + { + return data.getId(); + } + + @VisibleForTesting + String getDataFolder() + { + return data.folder.getPath(); + } + + @VisibleForTesting + LogFile getLogFile() + { + return data; + } + + static void delete(File file) + { + try + { + if (logger.isDebugEnabled()) + logger.debug("Deleting {}", file); + + Files.delete(file.toPath()); + } + catch (NoSuchFileException e) + { + logger.error("Unable to delete {} as it does not exist", file); + } + catch (IOException e) + { + logger.error("Unable to delete {}", file, e); + throw new RuntimeException(e); + } + } + + /** + * The transaction tidier. + * + * When the transaction reference is fully released we try to delete all the obsolete files + * depending on the transaction result, as well as the transaction log file. + */ + private static class TransactionTidier implements RefCounted.Tidy, Runnable + { + private final LogFile data; + private final int folderDescriptor; + + TransactionTidier(LogFile data, int folderDescriptor) + { + this.data = data; + this.folderDescriptor = folderDescriptor; + } + + public void tidy() throws Exception + { + run(); + } + + public String name() + { + return data.toString(); + } + + public void run() + { + if (logger.isDebugEnabled()) + logger.debug("Removing files for transaction {}", name()); + + assert data.completed() : "Expected a completed transaction: " + data; + + Throwable err = data.removeUnfinishedLeftovers(null); + + if (err != null) + { + logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err); + failedDeletions.add(this); + } + else + { + if (logger.isDebugEnabled()) + logger.debug("Closing file transaction {}", name()); + CLibrary.tryCloseFD(folderDescriptor); + } + } + } + + static class Obsoletion + { + final SSTableReader reader; + final SSTableTidier tidier; + + Obsoletion(SSTableReader reader, SSTableTidier tidier) + { + this.reader = reader; + this.tidier = tidier; + } + } + + /** + * The SSTableReader tidier. When a reader is fully released and no longer referenced + * by any one, we run this. It keeps a reference to the parent transaction and releases + * it when done, so that the final transaction cleanup can run when all obsolete readers + * are released. + */ + public static class SSTableTidier implements Runnable + { + // must not retain a reference to the SSTableReader, else leak detection cannot kick in + private final Descriptor desc; + private final long sizeOnDisk; + private final Tracker tracker; + private final boolean wasNew; + private final Ref<LogTransaction> parentRef; + + public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent) + { + this.desc = referent.descriptor; + this.sizeOnDisk = referent.bytesOnDisk(); + this.tracker = parent.tracker; + this.wasNew = wasNew; + this.parentRef = parent.selfRef.tryRef(); + } + + public void run() + { + SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); + + try + { + // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier + File datafile = new File(desc.filenameFor(Component.DATA)); + + delete(datafile); + // let the remainder be cleaned up by delete + SSTable.delete(desc, SSTable.discoverComponentsFor(desc)); + } + catch (Throwable t) + { + logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc); + failedDeletions.add(this); + return; + } + + if (tracker != null && tracker.cfstore != null && !wasNew) + tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk); + + // release the referent to the parent so that the all transaction files can be released + parentRef.release(); + } + + public void abort() + { + parentRef.release(); + } + } + + + static void rescheduleFailedDeletions() + { + Runnable task; + while ( null != (task = failedDeletions.poll())) + ScheduledExecutors.nonPeriodicTasks.submit(task); + + // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS. + SnapshotDeletingTask.rescheduleFailedTasks(); + } + + static void waitForDeletions() + { + FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS)); + } + + @VisibleForTesting + Throwable complete(Throwable accumulate) + { + try + { + accumulate = selfRef.ensureReleased(accumulate); + return accumulate; + } + catch (Throwable t) + { + logger.error("Failed to complete file transaction {}", getId(), t); + return Throwables.merge(accumulate, t); + } + } + + protected Throwable doCommit(Throwable accumulate) + { + data.commit(); + return complete(accumulate); + } + + protected Throwable doAbort(Throwable accumulate) + { + data.abort(); + return complete(accumulate); + } + + protected void doPrepare() { } + + /** + * Called on startup to scan existing folders for any unfinished leftovers of + * operations that were ongoing when the process exited. Also called by the standalone + * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil. + * + */ + static void removeUnfinishedLeftovers(CFMetaData metadata) + { + for (File dir : new Directories(metadata).getCFDirectories()) + { + int folderDescriptor = CLibrary.tryOpenDirectory(dir.getPath()); + try + { + File[] logs = dir.listFiles(LogFile::isLogFile); + + for (File log : logs) + { + LogFile data = LogFile.make(log, folderDescriptor); + data.readRecords(); + if (data.verify()) + { + Throwable failure = data.removeUnfinishedLeftovers(null); + logger.error("Failed to remove unfinished transaction leftovers for log {}", log, failure); + } + else + { + logger.error("Unexpected disk state: failed to read transaction log {}", log); + } + } + } + finally + { + CLibrary.tryCloseFD(folderDescriptor); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index d028493..ffb71ee 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -224,7 +224,7 @@ public class Tracker */ public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate) { - try (TransactionLog txnLogs = new TransactionLog(operationType, cfstore.metadata, this)) + try (LogTransaction txnLogs = new LogTransaction(operationType, cfstore.metadata, this)) { Pair<View, View> result = apply(view -> { Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting)))); @@ -236,7 +236,7 @@ public class Tracker // It is important that any method accepting/returning a Throwable never throws an exception, and does its best // to complete the instructions given to it - List<TransactionLog.Obsoletion> obsoletions = new ArrayList<>(); + List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>(); accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate); try {