Fix potential issue with LogTransaction only checking single dir for files Patch by stefania; reviewed by aweisberg for CASSANDRA-10421
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73781a9a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73781a9a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73781a9a Branch: refs/heads/trunk Commit: 73781a9a497de99d8cf2088d804173a11a3982f0 Parents: 1d28a4a Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Fri Oct 23 17:40:46 2015 -0400 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Fri Oct 23 17:40:46 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Memtable.java | 28 +- .../db/lifecycle/LifecycleTransaction.java | 39 +- .../db/lifecycle/LogAwareFileLister.java | 3 +- .../apache/cassandra/db/lifecycle/LogFile.java | 263 +++++---- .../cassandra/db/lifecycle/LogRecord.java | 192 +++++-- .../cassandra/db/lifecycle/LogReplica.java | 105 ++++ .../cassandra/db/lifecycle/LogReplicaSet.java | 229 ++++++++ .../cassandra/db/lifecycle/LogTransaction.java | 150 +++-- .../apache/cassandra/db/lifecycle/Tracker.java | 4 +- .../cassandra/io/sstable/SSTableTxnWriter.java | 4 +- .../org/apache/cassandra/io/util/FileUtils.java | 20 +- .../cassandra/streaming/StreamReceiveTask.java | 2 +- .../org/apache/cassandra/utils/Throwables.java | 5 + .../unit/org/apache/cassandra/db/ScrubTest.java | 2 +- .../cassandra/db/lifecycle/HelpersTest.java | 2 +- .../db/lifecycle/LogTransactionTest.java | 575 +++++++++++++++---- .../db/lifecycle/RealTransactionsTest.java | 6 +- .../io/sstable/SSTableRewriterTest.java | 2 +- 19 files changed, 1232 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 67e06ca..bc7c001 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421) * Support encrypted and plain traffic on the same port (CASSANDRA-10559) * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360) * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367) http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index f47efe3..96b1775 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -423,15 +423,25 @@ public class Memtable implements Comparable<Memtable> { // we operate "offline" here, as we expose the resulting reader consciously when done // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction) - LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata); - MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context); - return new SSTableTxnWriter(txn, - cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), - (long)partitions.size(), - ActiveRepairService.UNREPAIRED_SSTABLE, - sstableMetadataCollector, - new SerializationHeader(true, cfs.metadata, columns, stats), - txn)); + LifecycleTransaction txn = null; + try + { + txn = LifecycleTransaction.offline(OperationType.FLUSH); + MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context); + return new SSTableTxnWriter(txn, + cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename), + (long) partitions.size(), + ActiveRepairService.UNREPAIRED_SSTABLE, + sstableMetadataCollector, + new SerializationHeader(true, cfs.metadata, columns, stats), + txn)); + } + catch (Throwable t) + { + if (txn != null) + txn.close(); + throw t; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 3705f3d..a5eb01f 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -141,26 +141,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional * construct an empty Transaction with no existing readers */ @SuppressWarnings("resource") // log closed during postCleanup - public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata) + public static LifecycleTransaction offline(OperationType operationType) { Tracker dummy = new Tracker(null, false); - return new LifecycleTransaction(dummy, new LogTransaction(operationType, metadata, dummy), Collections.emptyList()); - } - - /** - * construct an empty Transaction with no existing readers - */ - @SuppressWarnings("resource") // log closed during postCleanup - public static LifecycleTransaction offline(OperationType operationType, File operationFolder) - { - Tracker dummy = new Tracker(null, false); - return new LifecycleTransaction(dummy, new LogTransaction(operationType, operationFolder, dummy), Collections.emptyList()); + return new LifecycleTransaction(dummy, new LogTransaction(operationType, dummy), Collections.emptyList()); } @SuppressWarnings("resource") // log closed during postCleanup LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers) { - this(tracker, new LogTransaction(operationType, getMetadata(tracker, readers), tracker), readers); + this(tracker, new LogTransaction(operationType, tracker), readers); } LifecycleTransaction(Tracker tracker, LogTransaction log, Iterable<SSTableReader> readers) @@ -175,21 +165,6 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional } } - private static CFMetaData getMetadata(Tracker tracker, Iterable<SSTableReader> readers) - { - if (tracker.cfstore != null) - return tracker.cfstore.metadata; - - for (SSTableReader reader : readers) - { - if (reader.metadata != null) - return reader.metadata; - } - - assert false : "Expected cfstore or at least one reader with metadata"; - return null; - } - public LogTransaction log() { return log; @@ -197,12 +172,12 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional public OperationType opType() { - return log.getType(); + return log.type(); } public UUID opId() { - return log.getId(); + return log.id(); } public void doPrepare() @@ -240,7 +215,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional accumulate = markObsolete(obsoletions, accumulate); accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate); accumulate = release(selfRefs(logged.obsolete), accumulate); - accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.getType(), accumulate); + accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.type(), accumulate); return accumulate; } @@ -506,7 +481,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional originals.remove(reader); marked.remove(reader); } - return new LifecycleTransaction(tracker, log.getType(), readers); + return new LifecycleTransaction(tracker, log.type(), readers); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java index e086078..01bcb8a 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java @@ -98,7 +98,7 @@ final class LogAwareFileLister */ void classifyFiles(File txnFile) { - LogFile txn = LogFile.make(txnFile, -1); + LogFile txn = LogFile.make(txnFile); readTxnLog(txn); classifyFiles(txn); files.put(txnFile, FileType.TXN_LOG); @@ -106,7 +106,6 @@ final class LogAwareFileLister void readTxnLog(LogFile txn) { - txn.readRecords(); if (!txn.verify() && onTxnErr == OnTxnErr.THROW) throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index bff3724..4318f9c 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -1,11 +1,12 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; -import java.nio.file.Files; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import org.apache.commons.lang3.StringUtils; @@ -16,13 +17,19 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LogRecord.Type; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.big.BigFormat; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.CLibrary; +import org.apache.cassandra.utils.Throwables; import static org.apache.cassandra.utils.Throwables.merge; /** - * The transaction log file, which contains many records. + * A transaction log file. We store transaction records into a log file, which is + * copied into multiple identical replicas on different disks, @see LogFileReplica. + * + * This class supports the transactional logic of LogTransaction and the removing + * of unfinished leftovers when a transaction is completed, or aborted, or when + * we clean up on start-up. + * + * @see LogTransaction */ final class LogFile { @@ -33,16 +40,26 @@ final class LogFile // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion) static Pattern FILE_REGEX = Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT)); - final File file; - final Set<LogRecord> records = new LinkedHashSet<>(); - final OperationType opType; - final UUID id; - final File folder; - final int folderDescriptor; + // A set of physical files on disk, each file is an identical replica + private final LogReplicaSet replicas = new LogReplicaSet(); - static LogFile make(File logFile, int folderDescriptor) + // The transaction records, this set must be ORDER PRESERVING + private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>(); + + // The type of the transaction + private final OperationType type; + + // The unique id of the transaction + private final UUID id; + + static LogFile make(File logReplica) + { + return make(logReplica.getName(), Collections.singletonList(logReplica)); + } + + static LogFile make(String fileName, List<File> logReplicas) { - Matcher matcher = LogFile.FILE_REGEX.matcher(logFile.getName()); + Matcher matcher = LogFile.FILE_REGEX.matcher(fileName); boolean matched = matcher.matches(); assert matched && matcher.groupCount() == 3; @@ -53,21 +70,20 @@ final class LogFile OperationType operationType = OperationType.fromFileName(matcher.group(2)); UUID id = UUID.fromString(matcher.group(3)); - return new LogFile(operationType, logFile.getParentFile(), folderDescriptor, id); + return new LogFile(operationType, id, logReplicas); } - void sync() + Throwable syncFolder(Throwable accumulate) { - if (folderDescriptor > 0) - CLibrary.trySync(folderDescriptor); + return replicas.syncFolder(accumulate); } - OperationType getType() + OperationType type() { - return opType; + return type; } - UUID getId() + UUID id() { return id; } @@ -76,13 +92,13 @@ final class LogFile { try { - deleteRecords(committed() ? Type.REMOVE : Type.ADD); + deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD); - // we sync the parent file descriptor between contents and log deletion + // we sync the parent folders between contents and log deletion // to ensure there is a happens before edge between them - sync(); + Throwables.maybeFail(syncFolder(accumulate)); - Files.delete(file.toPath()); + accumulate = replicas.delete(accumulate); } catch (Throwable t) { @@ -97,29 +113,30 @@ final class LogFile return LogFile.FILE_REGEX.matcher(file.getName()).matches(); } - LogFile(OperationType opType, File folder, int folderDescriptor, UUID id) + LogFile(OperationType type, UUID id, List<File> replicas) { - this.opType = opType; - this.id = id; - this.folder = folder; - this.file = new File(getFileName(folder, opType, id)); - this.folderDescriptor = folderDescriptor; + this(type, id); + this.replicas.addReplicas(replicas); } - public void readRecords() + LogFile(OperationType type, UUID id) { - assert records.isEmpty(); - FileUtils.readLines(file).stream() - .map(LogRecord::make) - .forEach(records::add); + this.type = type; + this.id = id; } - public boolean verify() + boolean verify() { - Optional<LogRecord> firstInvalid = records.stream() - .filter(this::isInvalid) - .findFirst(); + assert records.isEmpty(); + if (!replicas.readRecords(records)) + { + logger.error("Failed to read records from {}", replicas); + return false; + } + records.forEach(LogFile::verifyRecord); + + Optional<LogRecord> firstInvalid = records.stream().filter(LogRecord::isInvalidOrPartial).findFirst(); if (!firstInvalid.isPresent()) return true; @@ -130,9 +147,10 @@ final class LogFile return false; } + records.stream().filter((r) -> r != failedOn).forEach(LogFile::verifyRecordWithCorruptedLastRecord); if (records.stream() .filter((r) -> r != failedOn) - .filter(LogFile::isInvalidWithCorruptedLastRecord) + .filter(LogRecord::isInvalid) .map(LogFile::logError) .findFirst().isPresent()) { @@ -142,82 +160,75 @@ final class LogFile // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord, // then we simply exited whilst serializing the last record and we carry on - logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], but all previous records match state on disk; continuing", + logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], " + + "but all previous records match state on disk; continuing", id, - failedOn.error)); + failedOn.error())); return true; } static LogRecord logError(LogRecord record) { - logger.error("{}", record.error); + logger.error("{}", record.error()); return record; } - boolean isInvalid(LogRecord record) + static void verifyRecord(LogRecord record) { - if (!record.isValid()) - return true; - - if (record.type == Type.UNKNOWN) - { - record.error(String.format("Could not parse record [%s]", record)); - return true; - } - if (record.checksum != record.computeChecksum()) { - record.error(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]", - record.relativeFilePath, - record, - record.checksum, - record.computeChecksum())); - return true; + record.setError(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]", + record.fileName(), + record, + record.checksum, + record.computeChecksum())); + return; } if (record.type != Type.REMOVE) - return false; - - List<File> files = record.getExistingFiles(folder); + return; // Paranoid sanity checks: we create another record by looking at the files as they are - // on disk right now and make sure the information still matches - record.onDiskRecord = LogRecord.make(record.type, files, 0, record.relativeFilePath); - - if (record.updateTime != record.onDiskRecord.updateTime && record.onDiskRecord.numFiles > 0) + // on disk right now and make sure the information still matches. We don't want to delete + // files by mistake if the user has copied them from backup and forgot to remove a txn log + // file that obsoleted the very same files. So we check the latest update time and make sure + // it matches. Because we delete files from oldest to newest, the latest update time should + // always match. + record.status.onDiskRecord = record.withExistingFiles(); + if (record.updateTime != record.status.onDiskRecord.updateTime && record.status.onDiskRecord.numFiles > 0) { - record.error(String.format("Unexpected files detected for sstable [%s], record [%s]: last update time [%tT] should have been [%tT]", - record.relativeFilePath, - record, - record.onDiskRecord.updateTime, - record.updateTime)); - return true; - } + record.setError(String.format("Unexpected files detected for sstable [%s], " + + "record [%s]: last update time [%tT] should have been [%tT]", + record.fileName(), + record, + record.status.onDiskRecord.updateTime, + record.updateTime)); - return false; + } } - static boolean isInvalidWithCorruptedLastRecord(LogRecord record) + static void verifyRecordWithCorruptedLastRecord(LogRecord record) { - if (record.type == Type.REMOVE && record.onDiskRecord.numFiles < record.numFiles) - { // if we found a corruption in the last record, then we continue only if the number of files matches exactly for all previous records. - record.error(String.format("Incomplete fileset detected for sstable [%s], record [%s]: number of files [%d] should have been [%d]. Treating as unrecoverable due to corruption of the final record.", - record.relativeFilePath, - record.raw, - record.onDiskRecord.numFiles, - record.numFiles)); - return true; + if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles < record.numFiles) + { // if we found a corruption in the last record, then we continue only + // if the number of files matches exactly for all previous records. + record.setError(String.format("Incomplete fileset detected for sstable [%s], record [%s]: " + + "number of files [%d] should have been [%d]. Treating as unrecoverable " + + "due to corruption of the final record.", + record.fileName(), + record.raw, + record.status.onDiskRecord.numFiles, + record.numFiles)); } - return false; } - public void commit() + void commit() { assert !completed() : "Already completed!"; addRecord(LogRecord.makeCommit(System.currentTimeMillis())); } - public void abort() + void abort() { assert !completed() : "Already completed!"; addRecord(LogRecord.makeAbort(System.currentTimeMillis())); @@ -228,25 +239,25 @@ final class LogFile LogRecord lastRecord = getLastRecord(); return lastRecord != null && lastRecord.type == type && - !isInvalid(lastRecord); + lastRecord.isValid(); } - public boolean committed() + boolean committed() { return isLastRecordValidWithType(Type.COMMIT); } - public boolean aborted() + boolean aborted() { return isLastRecordValidWithType(Type.ABORT); } - public boolean completed() + boolean completed() { return committed() || aborted(); } - public void add(Type type, SSTable table) + void add(Type type, SSTable table) { if (!addRecord(makeRecord(type, table))) throw new IllegalStateException(); @@ -255,7 +266,10 @@ final class LogFile private LogRecord makeRecord(Type type, SSTable table) { assert type == Type.ADD || type == Type.REMOVE; - return LogRecord.make(type, folder, table); + + File folder = table.descriptor.directory; + replicas.maybeCreateReplica(folder, getFileName(folder), records); + return LogRecord.make(type, table); } private boolean addRecord(LogRecord record) @@ -263,48 +277,44 @@ final class LogFile if (!records.add(record)) return false; - // we only checksum the records, not the checksums themselves - FileUtils.append(file, record.toString()); - sync(); + replicas.append(record); return true; } - public void remove(Type type, SSTable table) + void remove(Type type, SSTable table) { LogRecord record = makeRecord(type, table); - - assert records.contains(record) : String.format("[%s] is not tracked by %s", record, file); + assert records.contains(record) : String.format("[%s] is not tracked by %s", record, id); records.remove(record); - deleteRecord(record); + deleteRecordFiles(record); } - public boolean contains(Type type, SSTable table) + boolean contains(Type type, SSTable table) { return records.contains(makeRecord(type, table)); } - public void deleteRecords(Type type) + void deleteFilesForRecordsOfType(Type type) { - assert file.exists() : String.format("Expected %s to exists", file); records.stream() .filter(type::matches) - .forEach(this::deleteRecord); + .forEach(LogFile::deleteRecordFiles); records.clear(); } - private void deleteRecord(LogRecord record) + private static void deleteRecordFiles(LogRecord record) { - List<File> files = record.getExistingFiles(folder); + List<File> files = record.getExistingFiles(); // we sort the files in ascending update time order so that the last update time - // stays the same even if we only partially delete files + // stays the same even if we only partially delete files, see comment in isInvalid() files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified())); files.forEach(LogTransaction::delete); } - public Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, Type type) + Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, Type type) { Map<LogRecord, Set<File>> ret = new HashMap<>(); @@ -316,50 +326,55 @@ final class LogFile return ret; } - public LogRecord getLastRecord() + LogRecord getLastRecord() { return Iterables.getLast(records, null); } - private Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record) + private static Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record) { - Set<File> ret = new HashSet<>(); - for (File file : files.tailSet(new File(folder, record.relativeFilePath))) - { - if (!file.getName().startsWith(record.relativeFilePath)) - break; - ret.add(file); - } - return ret; + String fileName = record.fileName(); + return files.stream().filter(f -> f.getName().startsWith(fileName)).collect(Collectors.toSet()); } - public void delete() + boolean exists() { - LogTransaction.delete(file); + return replicas.exists(); } - public boolean exists() + void close() { - return file.exists(); + replicas.close(); } @Override public String toString() { - return FileUtils.getRelativePath(folder.getPath(), file.getPath()); + return replicas.toString(); + } + + @VisibleForTesting + List<File> getFiles() + { + return replicas.getFiles(); } - static String getFileName(File folder, OperationType opType, UUID id) + @VisibleForTesting + List<String> getFilePaths() + { + return replicas.getFilePaths(); + } + + private String getFileName(File folder) { String fileName = StringUtils.join(BigFormat.latestVersion, LogFile.SEP, "txn", LogFile.SEP, - opType.fileName, + type.fileName, LogFile.SEP, id.toString(), LogFile.EXT); return StringUtils.join(folder, File.separator, fileName); } } - http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java index 0f0f3a2..9e606fc 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@ -1,6 +1,8 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -11,8 +13,9 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.FBUtilities; /** - * A log file record, each record is encoded in one line and has different - * content depending on the record type. + * A decoded line in a transaction log file replica. + * + * @see LogReplica and LogFile. */ final class LogRecord { @@ -38,18 +41,52 @@ final class LogRecord { return this == record.type; } + + public boolean isFinal() { return this == Type.COMMIT || this == Type.ABORT; } } + /** + * The status of a record after it has been verified, any parsing errors + * are also store here. + */ + public final static class Status + { + // if there are any errors, they end up here + Optional<String> error = Optional.empty(); + + // if the record was only partially matched across files this is true + boolean partial = false; + + // if the status of this record on disk is required (e.g. existing files), it is + // stored here for caching + LogRecord onDiskRecord; + + void setError(String error) + { + if (!this.error.isPresent()) + this.error = Optional.of(error); + } + + boolean hasError() + { + return error.isPresent(); + } + } + // the type of record, see Type public final Type type; - public final String relativeFilePath; + // for sstable records, the absolute path of the table desc + public final Optional<String> absolutePath; + // for sstable records, the last update time of all files (may not be available for NEW records) public final long updateTime; + // for sstable records, the total number of files (may not be accurate for NEW records) public final int numFiles; + // the raw string as written or read from a file public final String raw; + // the checksum of this record, written at the end of the record string public final long checksum; - - public String error; - public LogRecord onDiskRecord; + // the status of this record, @see Status class + public final Status status; // (add|remove|commit|abort):[*,*,*][checksum] static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE); @@ -60,60 +97,76 @@ final class LogRecord { Matcher matcher = REGEX.matcher(line); if (!matcher.matches()) - return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line) - .error(String.format("Failed to parse [%s]", line)); + return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line) + .setError(String.format("Failed to parse [%s]", line)); Type type = Type.fromPrefix(matcher.group(1)); - return new LogRecord(type, matcher.group(2), Long.valueOf(matcher.group(3)), Integer.valueOf(matcher.group(4)), Long.valueOf(matcher.group(5)), line); + return new LogRecord(type, + matcher.group(2), + Long.valueOf(matcher.group(3)), + Integer.valueOf(matcher.group(4)), + Long.valueOf(matcher.group(5)), line); } catch (Throwable t) { - return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line).error(t); + return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line).setError(t); } } public static LogRecord makeCommit(long updateTime) { - return new LogRecord(Type.COMMIT, "", updateTime, 0); + return new LogRecord(Type.COMMIT, updateTime); } public static LogRecord makeAbort(long updateTime) { - return new LogRecord(Type.ABORT, "", updateTime, 0); + return new LogRecord(Type.ABORT, updateTime); + } + + public static LogRecord make(Type type, SSTable table) + { + String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename()); + return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath); } - public static LogRecord make(Type type, File parentFolder, SSTable table) + public LogRecord withExistingFiles() { - String relativePath = FileUtils.getRelativePath(parentFolder.getPath(), table.descriptor.baseFilename()); - // why do we take the max of files.size() and table.getAllFilePaths().size()? - return make(type, getExistingFiles(parentFolder, relativePath), table.getAllFilePaths().size(), relativePath); + return make(type, getExistingFiles(), 0, absolutePath.get()); } - public static LogRecord make(Type type, List<File> files, int minFiles, String relativeFilePath) + public static LogRecord make(Type type, List<File> files, int minFiles, String absolutePath) { long lastModified = files.stream().map(File::lastModified).reduce(0L, Long::max); - return new LogRecord(type, relativeFilePath, lastModified, Math.max(minFiles, files.size())); + return new LogRecord(type, absolutePath, lastModified, Math.max(minFiles, files.size())); + } + + private LogRecord(Type type, long updateTime) + { + this(type, null, updateTime, 0, 0, null); } private LogRecord(Type type, - String relativeFilePath, + String absolutePath, long updateTime, int numFiles) { - this(type, relativeFilePath, updateTime, numFiles, 0, null); + this(type, absolutePath, updateTime, numFiles, 0, null); } private LogRecord(Type type, - String relativeFilePath, + String absolutePath, long updateTime, int numFiles, long checksum, String raw) { + assert !type.hasFile() || absolutePath != null : "Expected file path for file records"; + this.type = type; - this.relativeFilePath = type.hasFile() ? relativeFilePath : ""; // only meaningful for file records - this.updateTime = type == Type.REMOVE ? updateTime : 0; // only meaningful for old records - this.numFiles = type.hasFile() ? numFiles : 0; // only meaningful for file records + this.absolutePath = type.hasFile() ? Optional.of(absolutePath) : Optional.<String>empty(); + this.updateTime = type == Type.REMOVE ? updateTime : 0; + this.numFiles = type.hasFile() ? numFiles : 0; + this.status = new Status(); if (raw == null) { assert checksum == 0; @@ -125,49 +178,93 @@ final class LogRecord this.checksum = checksum; this.raw = raw; } - - this.error = ""; } - public LogRecord error(Throwable t) + LogRecord setError(Throwable t) { - return error(t.getMessage()); + return setError(t.getMessage()); } - public LogRecord error(String error) + LogRecord setError(String error) { - this.error = error; + status.setError(error); return this; } - public boolean isValid() + String error() { - return this.error.isEmpty(); + return status.error.orElse(""); + } + + void setPartial() + { + status.partial = true; + } + + boolean partial() + { + return status.partial; + } + + boolean isValid() + { + return !status.hasError() && type != Type.UNKNOWN; + } + + boolean isInvalid() + { + return !isValid(); + } + + boolean isInvalidOrPartial() + { + return isInvalid() || partial(); } private String format() { - return String.format("%s:[%s,%d,%d][%d]", type.toString(), relativeFilePath, updateTime, numFiles, checksum); + return String.format("%s:[%s,%d,%d][%d]", + type.toString(), + absolutePath(), + updateTime, + numFiles, + checksum); + } + + public List<File> getExistingFiles() + { + assert absolutePath.isPresent() : "Expected a path in order to get existing files"; + return getExistingFiles(absolutePath.get()); } - public List<File> getExistingFiles(File folder) + public static List<File> getExistingFiles(String absoluteFilePath) { - if (!type.hasFile()) - return Collections.emptyList(); + Path path = Paths.get(absoluteFilePath); + File[] files = path.getParent().toFile().listFiles((dir, name) -> name.startsWith(path.getFileName().toString())); + // files may be null if the directory does not exist yet, e.g. when tracking new files + return files == null ? Collections.emptyList() : Arrays.asList(files); + } - return getExistingFiles(folder, relativeFilePath); + public boolean isFinal() + { + return type.isFinal(); + } + + String fileName() + { + return absolutePath.isPresent() ? Paths.get(absolutePath.get()).getFileName().toString() : ""; } - public static List<File> getExistingFiles(File parentFolder, String relativeFilePath) + String absolutePath() { - return Arrays.asList(parentFolder.listFiles((dir, name) -> name.startsWith(relativeFilePath))); + return absolutePath.isPresent() ? absolutePath.get() : ""; } @Override public int hashCode() { // see comment in equals - return Objects.hash(type, relativeFilePath, error); + return Objects.hash(type, absolutePath, numFiles, updateTime); } @Override @@ -178,15 +275,12 @@ final class LogRecord final LogRecord other = (LogRecord)obj; - // we exclude on purpose checksum, update time and count as - // we don't want duplicated records that differ only by - // properties that might change on disk, especially COMMIT records, - // there should be only one regardless of update time - // however we must compare the error to make sure we have more than - // one UNKNOWN record, if we fail to parse more than one + // we exclude on purpose checksum, error and full file path + // since records must match across log file replicas on different disks return type == other.type && - relativeFilePath.equals(other.relativeFilePath) && - error.equals(other.error); + absolutePath.equals(other.absolutePath) && + numFiles == other.numFiles && + updateTime == other.updateTime; } @Override @@ -198,7 +292,7 @@ final class LogRecord long computeChecksum() { CRC32 crc32 = new CRC32(); - crc32.update(relativeFilePath.getBytes(FileUtils.CHARSET)); + crc32.update((absolutePath()).getBytes(FileUtils.CHARSET)); crc32.update(type.toString().getBytes(FileUtils.CHARSET)); FBUtilities.updateChecksumInt(crc32, (int) updateTime); FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java new file mode 100644 index 0000000..79b9749 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.lifecycle; + +import java.io.File; + +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.CLibrary; + +/** + * Because a column family may have sstables on different disks and disks can + * be removed, we duplicate log files into many replicas so as to have a file + * in each folder where sstables exist. + * + * Each replica contains the exact same content but we do allow for final + * partial records in case we crashed after writing to one replica but + * before compliting the write to another replica. + * + * @see LogFile + */ +final class LogReplica +{ + private final File file; + private int folderDescriptor; + + static LogReplica create(File folder, String fileName) + { + return new LogReplica(new File(fileName), CLibrary.tryOpenDirectory(folder.getPath())); + } + + static LogReplica open(File file) + { + return new LogReplica(file, CLibrary.tryOpenDirectory(file.getParentFile().getPath())); + } + + LogReplica(File file, int folderDescriptor) + { + this.file = file; + this.folderDescriptor = folderDescriptor; + } + + File file() + { + return file; + } + + void append(LogRecord record) + { + boolean existed = exists(); + FileUtils.appendAndSync(file, record.toString()); + + // If the file did not exist before appending the first + // line, then sync the folder as well since now it must exist + if (!existed) + syncFolder(); + } + + void syncFolder() + { + if (folderDescriptor >= 0) + CLibrary.trySync(folderDescriptor); + } + + void delete() + { + LogTransaction.delete(file); + syncFolder(); + } + + boolean exists() + { + return file.exists(); + } + + void close() + { + if (folderDescriptor >= 0) + { + CLibrary.tryCloseFD(folderDescriptor); + folderDescriptor = -1; + } + } + + @Override + public String toString() + { + return String.format("[%s] ", file); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java new file mode 100644 index 0000000..d9d9213 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Throwables; + +/** + * A set of log replicas. This class mostly iterates over replicas when writing or reading, + * ensuring consistency among them and hiding replication details from LogFile. + * + * @see LogReplica, LogFile + */ +public class LogReplicaSet +{ + private static final Logger logger = LoggerFactory.getLogger(LogReplicaSet.class); + + private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>(); + + private Collection<LogReplica> replicas() + { + return replicasByFile.values(); + } + + void addReplicas(List<File> replicas) + { + replicas.forEach(this::addReplica); + } + + void addReplica(File file) + { + File folder = file.getParentFile(); + assert !replicasByFile.containsKey(folder); + replicasByFile.put(folder, LogReplica.open(file)); + + if (logger.isTraceEnabled()) + logger.trace("Added log file replica {} ", file); + } + + void maybeCreateReplica(File folder, String fileName, Set<LogRecord> records) + { + if (replicasByFile.containsKey(folder)) + return; + + final LogReplica replica = LogReplica.create(folder, fileName); + + records.forEach(replica::append); + replicasByFile.put(folder, replica); + + if (logger.isTraceEnabled()) + logger.trace("Created new file replica {}", replica); + } + + Throwable syncFolder(Throwable accumulate) + { + return Throwables.perform(accumulate, replicas().stream().map(s -> s::syncFolder)); + } + + Throwable delete(Throwable accumulate) + { + return Throwables.perform(accumulate, replicas().stream().map(s -> s::delete)); + } + + private static boolean isPrefixMatch(String first, String second) + { + return first.length() >= second.length() ? + first.startsWith(second) : + second.startsWith(first); + } + + boolean readRecords(Set<LogRecord> records) + { + Map<File, List<String>> linesByReplica = replicas().stream() + .map(LogReplica::file) + .collect(Collectors.toMap(Function.<File>identity(), FileUtils::readLines)); + int maxNumLines = linesByReplica.values().stream().map(List::size).reduce(0, Integer::max); + for (int i = 0; i < maxNumLines; i++) + { + String firstLine = null; + boolean partial = false; + for (Map.Entry<File, List<String>> entry : linesByReplica.entrySet()) + { + List<String> currentLines = entry.getValue(); + if (i >= currentLines.size()) + continue; + + String currentLine = currentLines.get(i); + if (firstLine == null) + { + firstLine = currentLine; + continue; + } + + if (!isPrefixMatch(firstLine, currentLine)) + { // not a prefix match + logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up", + entry.getKey().getName(), + currentLine, + firstLine); + return false; + } + + if (!firstLine.equals(currentLine)) + { + if (i == currentLines.size() - 1) + { // last record, just set record as invalid and move on + logger.warn("Mismatched last line in file {}: '{}' not the same as '{}'", + entry.getKey().getName(), + currentLine, + firstLine); + + if (currentLine.length() > firstLine.length()) + firstLine = currentLine; + + partial = true; + } + else + { // mismatched entry file has more lines, giving up + logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up", + entry.getKey().getName(), + currentLine, + firstLine); + return false; + } + } + } + + LogRecord record = LogRecord.make(firstLine); + if (records.contains(record)) + { // duplicate records + logger.error("Found duplicate record {} for {}, giving up", record, record.fileName()); + return false; + } + + if (partial) + record.setPartial(); + + records.add(record); + + if (record.isFinal() && i != (maxNumLines - 1)) + { // too many final records + logger.error("Found too many lines for {}, giving up", record.fileName()); + return false; + } + } + + return true; + } + + /** + * Add the record to all the replicas: if it is a final record then we throw only if we fail to write it + * to all, otherwise we throw if we fail to write it to any file, see CASSANDRA-10421 for details + */ + void append(LogRecord record) + { + Throwable err = Throwables.perform(null, replicas().stream().map(r -> () -> r.append(record))); + if (err != null) + { + if (!record.isFinal() || err.getSuppressed().length == replicas().size() -1) + Throwables.maybeFail(err); + + logger.error("Failed to add record '{}' to some replicas '{}'", record, this); + } + } + + boolean exists() + { + Optional<Boolean> ret = replicas().stream().map(LogReplica::exists).reduce(Boolean::logicalAnd); + return ret.isPresent() ? + ret.get() + : false; + } + + void close() + { + Throwables.maybeFail(Throwables.perform(null, replicas().stream().map(r -> r::close))); + } + + @Override + public String toString() + { + Optional<String> ret = replicas().stream().map(LogReplica::toString).reduce(String::concat); + return ret.isPresent() ? + ret.get() + : "[-]"; + } + + @VisibleForTesting + List<File> getFiles() + { + return replicas().stream().map(LogReplica::file).collect(Collectors.toList()); + } + + @VisibleForTesting + List<String> getFilePaths() + { + return replicas().stream().map(LogReplica::file).map(File::getPath).collect(Collectors.toList()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index 78ea0f1..8b82207 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -92,17 +92,17 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran */ public static final class CorruptTransactionLogException extends RuntimeException { - public final LogFile file; + public final LogFile txnFile; - public CorruptTransactionLogException(String message, LogFile file) + public CorruptTransactionLogException(String message, LogFile txnFile) { super(message); - this.file = file; + this.txnFile = txnFile; } } private final Tracker tracker; - private final LogFile data; + private final LogFile txnFile; private final Ref<LogTransaction> selfRef; // Deleting sstables is tricky because the mmapping might not have been finalized yet, // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs). @@ -110,30 +110,19 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran // will be recognized as GCable. private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>(); - LogTransaction(OperationType opType, CFMetaData metadata) + LogTransaction(OperationType opType) { - this(opType, metadata, null); + this(opType, null); } - LogTransaction(OperationType opType, CFMetaData metadata, Tracker tracker) - { - this(opType, new Directories(metadata), tracker); - } - - LogTransaction(OperationType opType, Directories directories, Tracker tracker) - { - this(opType, directories.getDirectoryForNewSSTables(), tracker); - } - - LogTransaction(OperationType opType, File folder, Tracker tracker) + LogTransaction(OperationType opType, Tracker tracker) { this.tracker = tracker; - int folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath()); - this.data = new LogFile(opType, folder, folderDescriptor, UUIDGen.getTimeUUID()); - this.selfRef = new Ref<>(this, new TransactionTidier(data, folderDescriptor)); + this.txnFile = new LogFile(opType, UUIDGen.getTimeUUID()); + this.selfRef = new Ref<>(this, new TransactionTidier(txnFile)); if (logger.isTraceEnabled()) - logger.trace("Created transaction logs with id {}", data.id); + logger.trace("Created transaction logs with id {}", txnFile.id()); } /** @@ -141,7 +130,7 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran **/ void trackNew(SSTable table) { - data.add(Type.ADD, table); + txnFile.add(Type.ADD, table); } /** @@ -149,7 +138,7 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran */ void untrackNew(SSTable table) { - data.remove(Type.ADD, table); + txnFile.remove(Type.ADD, table); } /** @@ -157,15 +146,15 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran */ SSTableTidier obsoleted(SSTableReader reader) { - if (data.contains(Type.ADD, reader)) + if (txnFile.contains(Type.ADD, reader)) { - if (data.contains(Type.REMOVE, reader)) + if (txnFile.contains(Type.REMOVE, reader)) throw new IllegalArgumentException(); return new SSTableTidier(reader, true, this); } - data.add(Type.REMOVE, reader); + txnFile.add(Type.REMOVE, reader); if (tracker != null) tracker.notifyDeleting(reader); @@ -173,26 +162,32 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran return new SSTableTidier(reader, false, this); } - OperationType getType() + OperationType type() + { + return txnFile.type(); + } + + UUID id() { - return data.getType(); + return txnFile.id(); } - UUID getId() + @VisibleForTesting + LogFile txnFile() { - return data.getId(); + return txnFile; } @VisibleForTesting - String getDataFolder() + List<File> logFiles() { - return data.folder.getPath(); + return txnFile.getFiles(); } @VisibleForTesting - LogFile getLogFile() + List<String> logFilePaths() { - return data; + return txnFile.getFilePaths(); } static void delete(File file) @@ -224,12 +219,10 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran private static class TransactionTidier implements RefCounted.Tidy, Runnable { private final LogFile data; - private final int folderDescriptor; - TransactionTidier(LogFile data, int folderDescriptor) + TransactionTidier(LogFile data) { this.data = data; - this.folderDescriptor = folderDescriptor; } public void tidy() throws Exception @@ -247,7 +240,13 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran if (logger.isTraceEnabled()) logger.trace("Removing files for transaction {}", name()); - assert data.completed() : "Expected a completed transaction: " + data; + if (!data.completed()) + { // this happens if we forget to close a txn and the garbage collector closes it for us + logger.error("{} was not completed, trying to abort it now", data); + Throwable err = Throwables.perform((Throwable)null, data::abort); + if (err != null) + logger.error("Failed to abort {}", data, err); + } Throwable err = data.removeUnfinishedLeftovers(null); @@ -260,7 +259,8 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran { if (logger.isTraceEnabled()) logger.trace("Closing file transaction {}", name()); - CLibrary.tryCloseFD(folderDescriptor); + + data.close(); } } } @@ -360,20 +360,20 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran } catch (Throwable t) { - logger.error("Failed to complete file transaction {}", getId(), t); + logger.error("Failed to complete file transaction {}", id(), t); return Throwables.merge(accumulate, t); } } protected Throwable doCommit(Throwable accumulate) { - data.commit(); + txnFile.commit(); return complete(accumulate); } protected Throwable doAbort(Throwable accumulate) { - data.abort(); + txnFile.abort(); return complete(accumulate); } @@ -387,32 +387,62 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran */ static void removeUnfinishedLeftovers(CFMetaData metadata) { - for (File dir : new Directories(metadata).getCFDirectories()) + removeUnfinishedLeftovers(new Directories(metadata).getCFDirectories()); + } + + @VisibleForTesting + static void removeUnfinishedLeftovers(List<File> folders) + { + LogFilesByName logFiles = new LogFilesByName(); + folders.forEach(logFiles::list); + logFiles.removeUnfinishedLeftovers(); + } + + private static final class LogFilesByName + { + Map<String, List<File>> files = new HashMap<>(); + + void list(File folder) { - int folderDescriptor = CLibrary.tryOpenDirectory(dir.getPath()); - try + Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add); + } + + void add(File file) + { + List<File> filesByName = files.get(file.getName()); + if (filesByName == null) { - File[] logs = dir.listFiles(LogFile::isLogFile); + filesByName = new ArrayList<>(); + files.put(file.getName(), filesByName); + } + + filesByName.add(file); + } - for (File log : logs) + void removeUnfinishedLeftovers() + { + files.forEach(LogFilesByName::removeUnfinishedLeftovers); + } + + static void removeUnfinishedLeftovers(String name, List<File> logFiles) + { + LogFile txn = LogFile.make(name, logFiles); + try + { + if (txn.verify()) + { + Throwable failure = txn.removeUnfinishedLeftovers(null); + if (failure != null) + logger.error("Failed to remove unfinished transaction leftovers for txn {}", txn, failure); + } + else { - LogFile data = LogFile.make(log, folderDescriptor); - data.readRecords(); - if (data.verify()) - { - Throwable failure = data.removeUnfinishedLeftovers(null); - if (failure != null) - logger.error("Failed to remove unfinished transaction leftovers for log {}", log, failure); - } - else - { - logger.error("Unexpected disk state: failed to read transaction log {}", log); - } + logger.error("Unexpected disk state: failed to read transaction txn {}", txn); } } finally { - CLibrary.tryCloseFD(folderDescriptor); + txn.close(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 59c15bb..c09d49c 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -224,7 +224,7 @@ public class Tracker */ public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate) { - try (LogTransaction txnLogs = new LogTransaction(operationType, cfstore.metadata, this)) + try (LogTransaction txnLogs = new LogTransaction(operationType, this)) { Pair<View, View> result = apply(view -> { Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting)))); @@ -247,7 +247,7 @@ public class Tracker accumulate = updateSizeTracking(removed, emptySet(), accumulate); accumulate = release(selfRefs(removed), accumulate); // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion" - accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.getType(), accumulate); + accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate); } } catch (Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 9ad5a80..e889d85 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -96,7 +96,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem @SuppressWarnings("resource") // log and writer closed during postCleanup public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) { - LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn); return new SSTableTxnWriter(txn, writer); } @@ -105,7 +105,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) { // if the column family store does not exist, we create a new default SSTableMultiWriter to use: - LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel); SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn); return new SSTableTxnWriter(txn, writer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 5bcd34a..46f2de5 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -575,24 +575,32 @@ public class FileUtils public static void append(File file, String ... lines) { if (file.exists()) - write(file, StandardOpenOption.APPEND, lines); + write(file, Arrays.asList(lines), StandardOpenOption.APPEND); else - write(file, StandardOpenOption.CREATE, lines); + write(file, Arrays.asList(lines), StandardOpenOption.CREATE); + } + + public static void appendAndSync(File file, String ... lines) + { + if (file.exists()) + write(file, Arrays.asList(lines), StandardOpenOption.APPEND, StandardOpenOption.SYNC); + else + write(file, Arrays.asList(lines), StandardOpenOption.CREATE, StandardOpenOption.SYNC); } public static void replace(File file, String ... lines) { - write(file, StandardOpenOption.TRUNCATE_EXISTING, lines); + write(file, Arrays.asList(lines), StandardOpenOption.TRUNCATE_EXISTING); } - public static void write(File file, StandardOpenOption op, String ... lines) + public static void write(File file, List<String> lines, StandardOpenOption ... options) { try { Files.write(file.toPath(), - Arrays.asList(lines), + lines, CHARSET, - op); + options); } catch (IOException ex) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index f261954..0b864fa 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask this.totalSize = totalSize; // this is an "offline" transaction, as we currently manually expose the sstables once done; // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes - this.txn = LifecycleTransaction.offline(OperationType.STREAM, Schema.instance.getCFMetaData(cfId)); + this.txn = LifecycleTransaction.offline(OperationType.STREAM); this.sstables = new ArrayList<>(totalFiles); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index 8c5e3ec..923b723 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -87,6 +87,11 @@ public final class Throwables throw (E) fail; } + public static Throwable perform(Throwable accumulate, DiscreteAction<?> ... actions) + { + return perform(accumulate, Arrays.stream(actions)); + } + public static Throwable perform(Throwable accumulate, Stream<? extends DiscreteAction<?>> actions) { return perform(accumulate, actions.iterator()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index a553fe8..d5baec8 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -326,7 +326,7 @@ public class ScrubTest String filename = cfs.getSSTablePath(tempDataDir); Descriptor desc = Descriptor.fromFilename(filename); - LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, desc.directory); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); try (SSTableTxnWriter writer = new SSTableTxnWriter(txn, createTestWriter(desc, (long) keys.size(), cfs.metadata, txn))) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java index 0488245..3549523 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java @@ -159,7 +159,7 @@ public class HelpersTest public void testMarkObsolete() { ColumnFamilyStore cfs = MockSchema.newCFS(); - LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN, cfs.metadata); + LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN); Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs)); List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();