Fix potential issue with LogTransaction only checking single dir for files

Patch by stefania; reviewed by aweisberg for CASSANDRA-10421


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73781a9a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73781a9a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73781a9a

Branch: refs/heads/cassandra-3.0
Commit: 73781a9a497de99d8cf2088d804173a11a3982f0
Parents: 1d28a4a
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Fri Oct 23 17:40:46 2015 -0400
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Fri Oct 23 17:40:46 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |  28 +-
 .../db/lifecycle/LifecycleTransaction.java      |  39 +-
 .../db/lifecycle/LogAwareFileLister.java        |   3 +-
 .../apache/cassandra/db/lifecycle/LogFile.java  | 263 +++++----
 .../cassandra/db/lifecycle/LogRecord.java       | 192 +++++--
 .../cassandra/db/lifecycle/LogReplica.java      | 105 ++++
 .../cassandra/db/lifecycle/LogReplicaSet.java   | 229 ++++++++
 .../cassandra/db/lifecycle/LogTransaction.java  | 150 +++--
 .../apache/cassandra/db/lifecycle/Tracker.java  |   4 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |   4 +-
 .../org/apache/cassandra/io/util/FileUtils.java |  20 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   2 +-
 .../org/apache/cassandra/utils/Throwables.java  |   5 +
 .../unit/org/apache/cassandra/db/ScrubTest.java |   2 +-
 .../cassandra/db/lifecycle/HelpersTest.java     |   2 +-
 .../db/lifecycle/LogTransactionTest.java        | 575 +++++++++++++++----
 .../db/lifecycle/RealTransactionsTest.java      |   6 +-
 .../io/sstable/SSTableRewriterTest.java         |   2 +-
 19 files changed, 1232 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 67e06ca..bc7c001 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Fix LogTransaction checking only a single directory for files 
(CASSANDRA-10421)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Fix handling of range tombstones when reading old format sstables 
(CASSANDRA-10360)
  * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index f47efe3..96b1775 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -423,15 +423,25 @@ public class Memtable implements Comparable<Memtable>
         {
             // we operate "offline" here, as we expose the resulting reader 
consciously when done
             // (although we may want to modify this behaviour in future, to 
encapsulate full flush behaviour in LifecycleTransaction)
-            LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
-            MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-            return new SSTableTxnWriter(txn,
-                                        
cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
-                                                                     
(long)partitions.size(),
-                                                                     
ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                     
sstableMetadataCollector,
-                                                                     new 
SerializationHeader(true, cfs.metadata, columns, stats),
-                                                                     txn));
+            LifecycleTransaction txn = null;
+            try
+            {
+                txn = LifecycleTransaction.offline(OperationType.FLUSH);
+                MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+                return new SSTableTxnWriter(txn,
+                                            
cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+                                                                         
(long) partitions.size(),
+                                                                         
ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                         
sstableMetadataCollector,
+                                                                         new 
SerializationHeader(true, cfs.metadata, columns, stats),
+                                                                         txn));
+            }
+            catch (Throwable t)
+            {
+                if (txn != null)
+                    txn.close();
+                throw t;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 3705f3d..a5eb01f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -141,26 +141,16 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
      * construct an empty Transaction with no existing readers
      */
     @SuppressWarnings("resource") // log closed during postCleanup
-    public static LifecycleTransaction offline(OperationType operationType, 
CFMetaData metadata)
+    public static LifecycleTransaction offline(OperationType operationType)
     {
         Tracker dummy = new Tracker(null, false);
-        return new LifecycleTransaction(dummy, new 
LogTransaction(operationType, metadata, dummy), Collections.emptyList());
-    }
-
-    /**
-     * construct an empty Transaction with no existing readers
-     */
-    @SuppressWarnings("resource") // log closed during postCleanup
-    public static LifecycleTransaction offline(OperationType operationType, 
File operationFolder)
-    {
-        Tracker dummy = new Tracker(null, false);
-        return new LifecycleTransaction(dummy, new 
LogTransaction(operationType, operationFolder, dummy), Collections.emptyList());
+        return new LifecycleTransaction(dummy, new 
LogTransaction(operationType, dummy), Collections.emptyList());
     }
 
     @SuppressWarnings("resource") // log closed during postCleanup
     LifecycleTransaction(Tracker tracker, OperationType operationType, 
Iterable<SSTableReader> readers)
     {
-        this(tracker, new LogTransaction(operationType, getMetadata(tracker, 
readers), tracker), readers);
+        this(tracker, new LogTransaction(operationType, tracker), readers);
     }
 
     LifecycleTransaction(Tracker tracker, LogTransaction log, 
Iterable<SSTableReader> readers)
@@ -175,21 +165,6 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
         }
     }
 
-    private static CFMetaData getMetadata(Tracker tracker, 
Iterable<SSTableReader> readers)
-    {
-        if (tracker.cfstore != null)
-            return tracker.cfstore.metadata;
-
-        for (SSTableReader reader : readers)
-        {
-            if (reader.metadata != null)
-                return reader.metadata;
-        }
-
-        assert false : "Expected cfstore or at least one reader with metadata";
-        return null;
-    }
-
     public LogTransaction log()
     {
         return log;
@@ -197,12 +172,12 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
 
     public OperationType opType()
     {
-        return log.getType();
+        return log.type();
     }
 
     public UUID opId()
     {
-        return log.getId();
+        return log.id();
     }
 
     public void doPrepare()
@@ -240,7 +215,7 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
         accumulate = markObsolete(obsoletions, accumulate);
         accumulate = tracker.updateSizeTracking(logged.obsolete, 
logged.update, accumulate);
         accumulate = release(selfRefs(logged.obsolete), accumulate);
-        accumulate = tracker.notifySSTablesChanged(originals, logged.update, 
log.getType(), accumulate);
+        accumulate = tracker.notifySSTablesChanged(originals, logged.update, 
log.type(), accumulate);
 
         return accumulate;
     }
@@ -506,7 +481,7 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
             originals.remove(reader);
             marked.remove(reader);
         }
-        return new LifecycleTransaction(tracker, log.getType(), readers);
+        return new LifecycleTransaction(tracker, log.type(), readers);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
index e086078..01bcb8a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
@@ -98,7 +98,7 @@ final class LogAwareFileLister
      */
     void classifyFiles(File txnFile)
     {
-        LogFile txn = LogFile.make(txnFile, -1);
+        LogFile txn = LogFile.make(txnFile);
         readTxnLog(txn);
         classifyFiles(txn);
         files.put(txnFile, FileType.TXN_LOG);
@@ -106,7 +106,6 @@ final class LogAwareFileLister
 
     void readTxnLog(LogFile txn)
     {
-        txn.readRecords();
         if (!txn.verify() && onTxnErr == OnTxnErr.THROW)
             throw new LogTransaction.CorruptTransactionLogException("Some 
records failed verification. See earlier in log for details.", txn);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index bff3724..4318f9c 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -1,11 +1,12 @@
 package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
-import java.nio.file.Files;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.lang3.StringUtils;
@@ -16,13 +17,19 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LogRecord.Type;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.Throwables;
 
 import static org.apache.cassandra.utils.Throwables.merge;
 
 /**
- * The transaction log file, which contains many records.
+ * A transaction log file. We store transaction records into a log file, which 
is
+ * copied into multiple identical replicas on different disks, @see 
LogFileReplica.
+ *
+ * This class supports the transactional logic of LogTransaction and the 
removing
+ * of unfinished leftovers when a transaction is completed, or aborted, or when
+ * we clean up on start-up.
+ *
+ * @see LogTransaction
  */
 final class LogFile
 {
@@ -33,16 +40,26 @@ final class LogFile
     // cc_txn_opname_id.log (where cc is one of the sstable versions defined 
in BigVersion)
     static Pattern FILE_REGEX = 
Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT));
 
-    final File file;
-    final Set<LogRecord> records = new LinkedHashSet<>();
-    final OperationType opType;
-    final UUID id;
-    final File folder;
-    final int folderDescriptor;
+    // A set of physical files on disk, each file is an identical replica
+    private final LogReplicaSet replicas = new LogReplicaSet();
 
-    static LogFile make(File logFile, int folderDescriptor)
+    // The transaction records, this set must be ORDER PRESERVING
+    private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
+
+    // The type of the transaction
+    private final OperationType type;
+
+    // The unique id of the transaction
+    private final UUID id;
+
+    static LogFile make(File logReplica)
+    {
+        return make(logReplica.getName(), 
Collections.singletonList(logReplica));
+    }
+
+    static LogFile make(String fileName, List<File> logReplicas)
     {
-        Matcher matcher = LogFile.FILE_REGEX.matcher(logFile.getName());
+        Matcher matcher = LogFile.FILE_REGEX.matcher(fileName);
         boolean matched = matcher.matches();
         assert matched && matcher.groupCount() == 3;
 
@@ -53,21 +70,20 @@ final class LogFile
         OperationType operationType = 
OperationType.fromFileName(matcher.group(2));
         UUID id = UUID.fromString(matcher.group(3));
 
-        return new LogFile(operationType, logFile.getParentFile(), 
folderDescriptor, id);
+        return new LogFile(operationType, id, logReplicas);
     }
 
-    void sync()
+    Throwable syncFolder(Throwable accumulate)
     {
-        if (folderDescriptor > 0)
-            CLibrary.trySync(folderDescriptor);
+        return replicas.syncFolder(accumulate);
     }
 
-    OperationType getType()
+    OperationType type()
     {
-        return opType;
+        return type;
     }
 
-    UUID getId()
+    UUID id()
     {
         return id;
     }
@@ -76,13 +92,13 @@ final class LogFile
     {
         try
         {
-            deleteRecords(committed() ? Type.REMOVE : Type.ADD);
+            deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
 
-            // we sync the parent file descriptor between contents and log 
deletion
+            // we sync the parent folders between contents and log deletion
             // to ensure there is a happens before edge between them
-            sync();
+            Throwables.maybeFail(syncFolder(accumulate));
 
-            Files.delete(file.toPath());
+            accumulate = replicas.delete(accumulate);
         }
         catch (Throwable t)
         {
@@ -97,29 +113,30 @@ final class LogFile
         return LogFile.FILE_REGEX.matcher(file.getName()).matches();
     }
 
-    LogFile(OperationType opType, File folder, int folderDescriptor, UUID id)
+    LogFile(OperationType type, UUID id, List<File> replicas)
     {
-        this.opType = opType;
-        this.id = id;
-        this.folder = folder;
-        this.file = new File(getFileName(folder, opType, id));
-        this.folderDescriptor = folderDescriptor;
+        this(type, id);
+        this.replicas.addReplicas(replicas);
     }
 
-    public void readRecords()
+    LogFile(OperationType type, UUID id)
     {
-        assert records.isEmpty();
-        FileUtils.readLines(file).stream()
-                 .map(LogRecord::make)
-                 .forEach(records::add);
+        this.type = type;
+        this.id = id;
     }
 
-    public boolean verify()
+    boolean verify()
     {
-        Optional<LogRecord> firstInvalid = records.stream()
-                                                  .filter(this::isInvalid)
-                                                  .findFirst();
+        assert records.isEmpty();
+        if (!replicas.readRecords(records))
+        {
+            logger.error("Failed to read records from {}", replicas);
+            return false;
+        }
 
+        records.forEach(LogFile::verifyRecord);
+
+        Optional<LogRecord> firstInvalid = 
records.stream().filter(LogRecord::isInvalidOrPartial).findFirst();
         if (!firstInvalid.isPresent())
             return true;
 
@@ -130,9 +147,10 @@ final class LogFile
             return false;
         }
 
+        records.stream().filter((r) -> r != 
failedOn).forEach(LogFile::verifyRecordWithCorruptedLastRecord);
         if (records.stream()
                    .filter((r) -> r != failedOn)
-                   .filter(LogFile::isInvalidWithCorruptedLastRecord)
+                   .filter(LogRecord::isInvalid)
                    .map(LogFile::logError)
                    .findFirst().isPresent())
         {
@@ -142,82 +160,75 @@ final class LogFile
 
         // if only the last record is corrupt and all other records have 
matching files on disk, @see verifyRecord,
         // then we simply exited whilst serializing the last record and we 
carry on
-        logger.warn(String.format("Last record of transaction %s is corrupt or 
incomplete [%s], but all previous records match state on disk; continuing",
+        logger.warn(String.format("Last record of transaction %s is corrupt or 
incomplete [%s], " +
+                                  "but all previous records match state on 
disk; continuing",
                                   id,
-                                  failedOn.error));
+                                  failedOn.error()));
         return true;
     }
 
     static LogRecord logError(LogRecord record)
     {
-        logger.error("{}", record.error);
+        logger.error("{}", record.error());
         return record;
     }
 
-    boolean isInvalid(LogRecord record)
+    static void verifyRecord(LogRecord record)
     {
-        if (!record.isValid())
-            return true;
-
-        if (record.type == Type.UNKNOWN)
-        {
-            record.error(String.format("Could not parse record [%s]", record));
-            return true;
-        }
-
         if (record.checksum != record.computeChecksum())
         {
-            record.error(String.format("Invalid checksum for sstable [%s], 
record [%s]: [%d] should have been [%d]",
-                                       record.relativeFilePath,
-                                       record,
-                                       record.checksum,
-                                       record.computeChecksum()));
-            return true;
+            record.setError(String.format("Invalid checksum for sstable [%s], 
record [%s]: [%d] should have been [%d]",
+                                          record.fileName(),
+                                          record,
+                                          record.checksum,
+                                          record.computeChecksum()));
+            return;
         }
 
         if (record.type != Type.REMOVE)
-            return false;
-
-        List<File> files = record.getExistingFiles(folder);
+            return;
 
         // Paranoid sanity checks: we create another record by looking at the 
files as they are
-        // on disk right now and make sure the information still matches
-        record.onDiskRecord = LogRecord.make(record.type, files, 0, 
record.relativeFilePath);
-
-        if (record.updateTime != record.onDiskRecord.updateTime && 
record.onDiskRecord.numFiles > 0)
+        // on disk right now and make sure the information still matches. We 
don't want to delete
+        // files by mistake if the user has copied them from backup and forgot 
to remove a txn log
+        // file that obsoleted the very same files. So we check the latest 
update time and make sure
+        // it matches. Because we delete files from oldest to newest, the 
latest update time should
+        // always match.
+        record.status.onDiskRecord = record.withExistingFiles();
+        if (record.updateTime != record.status.onDiskRecord.updateTime && 
record.status.onDiskRecord.numFiles > 0)
         {
-            record.error(String.format("Unexpected files detected for sstable 
[%s], record [%s]: last update time [%tT] should have been [%tT]",
-                                       record.relativeFilePath,
-                                       record,
-                                       record.onDiskRecord.updateTime,
-                                       record.updateTime));
-            return true;
-        }
+            record.setError(String.format("Unexpected files detected for 
sstable [%s], " +
+                                          "record [%s]: last update time [%tT] 
should have been [%tT]",
+                                          record.fileName(),
+                                          record,
+                                          
record.status.onDiskRecord.updateTime,
+                                          record.updateTime));
 
-        return false;
+        }
     }
 
-    static boolean isInvalidWithCorruptedLastRecord(LogRecord record)
+    static void verifyRecordWithCorruptedLastRecord(LogRecord record)
     {
-        if (record.type == Type.REMOVE && record.onDiskRecord.numFiles < 
record.numFiles)
-        { // if we found a corruption in the last record, then we continue 
only if the number of files matches exactly for all previous records.
-            record.error(String.format("Incomplete fileset detected for 
sstable [%s], record [%s]: number of files [%d] should have been [%d]. Treating 
as unrecoverable due to corruption of the final record.",
-                         record.relativeFilePath,
-                         record.raw,
-                         record.onDiskRecord.numFiles,
-                         record.numFiles));
-            return true;
+        if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles 
< record.numFiles)
+        { // if we found a corruption in the last record, then we continue only
+          // if the number of files matches exactly for all previous records.
+            record.setError(String.format("Incomplete fileset detected for 
sstable [%s], record [%s]: " +
+                                          "number of files [%d] should have 
been [%d]. Treating as unrecoverable " +
+                                          "due to corruption of the final 
record.",
+                                          record.fileName(),
+                                          record.raw,
+                                          record.status.onDiskRecord.numFiles,
+                                          record.numFiles));
         }
-        return false;
     }
 
-    public void commit()
+    void commit()
     {
         assert !completed() : "Already completed!";
         addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
     }
 
-    public void abort()
+    void abort()
     {
         assert !completed() : "Already completed!";
         addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
@@ -228,25 +239,25 @@ final class LogFile
         LogRecord lastRecord = getLastRecord();
         return lastRecord != null &&
                lastRecord.type == type &&
-               !isInvalid(lastRecord);
+               lastRecord.isValid();
     }
 
-    public boolean committed()
+    boolean committed()
     {
         return isLastRecordValidWithType(Type.COMMIT);
     }
 
-    public boolean aborted()
+    boolean aborted()
     {
         return isLastRecordValidWithType(Type.ABORT);
     }
 
-    public boolean completed()
+    boolean completed()
     {
         return committed() || aborted();
     }
 
-    public void add(Type type, SSTable table)
+    void add(Type type, SSTable table)
     {
         if (!addRecord(makeRecord(type, table)))
             throw new IllegalStateException();
@@ -255,7 +266,10 @@ final class LogFile
     private LogRecord makeRecord(Type type, SSTable table)
     {
         assert type == Type.ADD || type == Type.REMOVE;
-        return LogRecord.make(type, folder, table);
+
+        File folder = table.descriptor.directory;
+        replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        return LogRecord.make(type, table);
     }
 
     private boolean addRecord(LogRecord record)
@@ -263,48 +277,44 @@ final class LogFile
         if (!records.add(record))
             return false;
 
-        // we only checksum the records, not the checksums themselves
-        FileUtils.append(file, record.toString());
-        sync();
+        replicas.append(record);
         return true;
     }
 
-    public void remove(Type type, SSTable table)
+    void remove(Type type, SSTable table)
     {
         LogRecord record = makeRecord(type, table);
-
-        assert records.contains(record) : String.format("[%s] is not tracked 
by %s", record, file);
+        assert records.contains(record) : String.format("[%s] is not tracked 
by %s", record, id);
 
         records.remove(record);
-        deleteRecord(record);
+        deleteRecordFiles(record);
     }
 
-    public boolean contains(Type type, SSTable table)
+    boolean contains(Type type, SSTable table)
     {
         return records.contains(makeRecord(type, table));
     }
 
-    public void deleteRecords(Type type)
+    void deleteFilesForRecordsOfType(Type type)
     {
-        assert file.exists() : String.format("Expected %s to exists", file);
         records.stream()
                .filter(type::matches)
-               .forEach(this::deleteRecord);
+               .forEach(LogFile::deleteRecordFiles);
         records.clear();
     }
 
-    private void deleteRecord(LogRecord record)
+    private static void deleteRecordFiles(LogRecord record)
     {
-        List<File> files = record.getExistingFiles(folder);
+        List<File> files = record.getExistingFiles();
 
         // we sort the files in ascending update time order so that the last 
update time
-        // stays the same even if we only partially delete files
+        // stays the same even if we only partially delete files, see comment 
in isInvalid()
         files.sort((f1, f2) -> Long.compare(f1.lastModified(), 
f2.lastModified()));
 
         files.forEach(LogTransaction::delete);
     }
 
-    public Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, 
Type type)
+    Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, Type 
type)
     {
         Map<LogRecord, Set<File>> ret = new HashMap<>();
 
@@ -316,50 +326,55 @@ final class LogFile
         return ret;
     }
 
-    public LogRecord getLastRecord()
+    LogRecord getLastRecord()
     {
         return Iterables.getLast(records, null);
     }
 
-    private Set<File> getRecordFiles(NavigableSet<File> files, LogRecord 
record)
+    private static Set<File> getRecordFiles(NavigableSet<File> files, 
LogRecord record)
     {
-        Set<File> ret = new HashSet<>();
-        for (File file : files.tailSet(new File(folder, 
record.relativeFilePath)))
-        {
-            if (!file.getName().startsWith(record.relativeFilePath))
-                break;
-            ret.add(file);
-        }
-        return ret;
+        String fileName = record.fileName();
+        return files.stream().filter(f -> 
f.getName().startsWith(fileName)).collect(Collectors.toSet());
     }
 
-    public void delete()
+    boolean exists()
     {
-        LogTransaction.delete(file);
+        return replicas.exists();
     }
 
-    public boolean exists()
+    void close()
     {
-        return file.exists();
+        replicas.close();
     }
 
     @Override
     public String toString()
     {
-        return FileUtils.getRelativePath(folder.getPath(), file.getPath());
+        return replicas.toString();
+    }
+
+    @VisibleForTesting
+    List<File> getFiles()
+    {
+        return replicas.getFiles();
     }
 
-    static String getFileName(File folder, OperationType opType, UUID id)
+    @VisibleForTesting
+    List<String> getFilePaths()
+    {
+        return replicas.getFilePaths();
+    }
+
+    private String getFileName(File folder)
     {
         String fileName = StringUtils.join(BigFormat.latestVersion,
                                            LogFile.SEP,
                                            "txn",
                                            LogFile.SEP,
-                                           opType.fileName,
+                                           type.fileName,
                                            LogFile.SEP,
                                            id.toString(),
                                            LogFile.EXT);
         return StringUtils.join(folder, File.separator, fileName);
     }
 }
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index 0f0f3a2..9e606fc 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -1,6 +1,8 @@
 package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -11,8 +13,9 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
- * A log file record, each record is encoded in one line and has different
- * content depending on the record type.
+ * A decoded line in a transaction log file replica.
+ *
+ * @see LogReplica and LogFile.
  */
 final class LogRecord
 {
@@ -38,18 +41,52 @@ final class LogRecord
         {
             return this == record.type;
         }
+
+        public boolean isFinal() { return this == Type.COMMIT || this == 
Type.ABORT; }
     }
 
+    /**
+     * The status of a record after it has been verified, any parsing errors
+     * are also store here.
+     */
+    public final static class Status
+    {
+        // if there are any errors, they end up here
+        Optional<String> error = Optional.empty();
+
+        // if the record was only partially matched across files this is true
+        boolean partial = false;
+
+        // if the status of this record on disk is required (e.g. existing 
files), it is
+        // stored here for caching
+        LogRecord onDiskRecord;
+
+        void setError(String error)
+        {
+            if (!this.error.isPresent())
+                this.error = Optional.of(error);
+        }
+
+        boolean hasError()
+        {
+            return error.isPresent();
+        }
+    }
 
+    // the type of record, see Type
     public final Type type;
-    public final String relativeFilePath;
+    // for sstable records, the absolute path of the table desc
+    public final Optional<String> absolutePath;
+    // for sstable records, the last update time of all files (may not be 
available for NEW records)
     public final long updateTime;
+    // for sstable records, the total number of files (may not be accurate for 
NEW records)
     public final int numFiles;
+    // the raw string as written or read from a file
     public final String raw;
+    // the checksum of this record, written at the end of the record string
     public final long checksum;
-
-    public String error;
-    public LogRecord onDiskRecord;
+    // the status of this record, @see Status class
+    public final Status status;
 
     // (add|remove|commit|abort):[*,*,*][checksum]
     static Pattern REGEX = 
Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$",
 Pattern.CASE_INSENSITIVE);
@@ -60,60 +97,76 @@ final class LogRecord
         {
             Matcher matcher = REGEX.matcher(line);
             if (!matcher.matches())
-                return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line)
-                       .error(String.format("Failed to parse [%s]", line));
+                return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
+                       .setError(String.format("Failed to parse [%s]", line));
 
             Type type = Type.fromPrefix(matcher.group(1));
-            return new LogRecord(type, matcher.group(2), 
Long.valueOf(matcher.group(3)), Integer.valueOf(matcher.group(4)), 
Long.valueOf(matcher.group(5)), line);
+            return new LogRecord(type,
+                                 matcher.group(2),
+                                 Long.valueOf(matcher.group(3)),
+                                 Integer.valueOf(matcher.group(4)),
+                                 Long.valueOf(matcher.group(5)), line);
         }
         catch (Throwable t)
         {
-            return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line).error(t);
+            return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, 
line).setError(t);
         }
     }
 
     public static LogRecord makeCommit(long updateTime)
     {
-        return new LogRecord(Type.COMMIT, "", updateTime, 0);
+        return new LogRecord(Type.COMMIT, updateTime);
     }
 
     public static LogRecord makeAbort(long updateTime)
     {
-        return new LogRecord(Type.ABORT, "", updateTime, 0);
+        return new LogRecord(Type.ABORT, updateTime);
+    }
+
+    public static LogRecord make(Type type, SSTable table)
+    {
+        String absoluteTablePath = 
FileUtils.getCanonicalPath(table.descriptor.baseFilename());
+        return make(type, getExistingFiles(absoluteTablePath), 
table.getAllFilePaths().size(), absoluteTablePath);
     }
 
-    public static LogRecord make(Type type, File parentFolder, SSTable table)
+    public LogRecord withExistingFiles()
     {
-        String relativePath = 
FileUtils.getRelativePath(parentFolder.getPath(), 
table.descriptor.baseFilename());
-        // why do we take the max of files.size() and 
table.getAllFilePaths().size()?
-        return make(type, getExistingFiles(parentFolder, relativePath), 
table.getAllFilePaths().size(), relativePath);
+        return make(type, getExistingFiles(), 0, absolutePath.get());
     }
 
-    public static LogRecord make(Type type, List<File> files, int minFiles, 
String relativeFilePath)
+    public static LogRecord make(Type type, List<File> files, int minFiles, 
String absolutePath)
     {
         long lastModified = files.stream().map(File::lastModified).reduce(0L, 
Long::max);
-        return new LogRecord(type, relativeFilePath, lastModified, 
Math.max(minFiles, files.size()));
+        return new LogRecord(type, absolutePath, lastModified, 
Math.max(minFiles, files.size()));
+    }
+
+    private LogRecord(Type type, long updateTime)
+    {
+        this(type, null, updateTime, 0, 0, null);
     }
 
     private LogRecord(Type type,
-                      String relativeFilePath,
+                      String absolutePath,
                       long updateTime,
                       int numFiles)
     {
-        this(type, relativeFilePath, updateTime, numFiles, 0, null);
+        this(type, absolutePath, updateTime, numFiles, 0, null);
     }
 
     private LogRecord(Type type,
-                      String relativeFilePath,
+                      String absolutePath,
                       long updateTime,
                       int numFiles,
                       long checksum,
                       String raw)
     {
+        assert !type.hasFile() || absolutePath != null : "Expected file path 
for file records";
+
         this.type = type;
-        this.relativeFilePath = type.hasFile() ? relativeFilePath : ""; // 
only meaningful for file records
-        this.updateTime = type == Type.REMOVE ? updateTime : 0; // only 
meaningful for old records
-        this.numFiles = type.hasFile() ? numFiles : 0; // only meaningful for 
file records
+        this.absolutePath = type.hasFile() ? Optional.of(absolutePath) : 
Optional.<String>empty();
+        this.updateTime = type == Type.REMOVE ? updateTime : 0;
+        this.numFiles = type.hasFile() ? numFiles : 0;
+        this.status = new Status();
         if (raw == null)
         {
             assert checksum == 0;
@@ -125,49 +178,93 @@ final class LogRecord
             this.checksum = checksum;
             this.raw = raw;
         }
-
-        this.error = "";
     }
 
-    public LogRecord error(Throwable t)
+    LogRecord setError(Throwable t)
     {
-        return error(t.getMessage());
+        return setError(t.getMessage());
     }
 
-    public LogRecord error(String error)
+    LogRecord setError(String error)
     {
-        this.error = error;
+        status.setError(error);
         return this;
     }
 
-    public boolean isValid()
+    String error()
     {
-        return this.error.isEmpty();
+        return status.error.orElse("");
+    }
+
+    void setPartial()
+    {
+        status.partial = true;
+    }
+
+    boolean partial()
+    {
+        return status.partial;
+    }
+
+    boolean isValid()
+    {
+        return !status.hasError() && type != Type.UNKNOWN;
+    }
+
+    boolean isInvalid()
+    {
+        return !isValid();
+    }
+
+    boolean isInvalidOrPartial()
+    {
+        return isInvalid() || partial();
     }
 
     private String format()
     {
-        return String.format("%s:[%s,%d,%d][%d]", type.toString(), 
relativeFilePath, updateTime, numFiles, checksum);
+        return String.format("%s:[%s,%d,%d][%d]",
+                             type.toString(),
+                             absolutePath(),
+                             updateTime,
+                             numFiles,
+                             checksum);
+    }
+
+    public List<File> getExistingFiles()
+    {
+        assert absolutePath.isPresent() : "Expected a path in order to get 
existing files";
+        return getExistingFiles(absolutePath.get());
     }
 
-    public List<File> getExistingFiles(File folder)
+    public static List<File> getExistingFiles(String absoluteFilePath)
     {
-        if (!type.hasFile())
-            return Collections.emptyList();
+        Path path = Paths.get(absoluteFilePath);
+        File[] files = path.getParent().toFile().listFiles((dir, name) -> 
name.startsWith(path.getFileName().toString()));
+        // files may be null if the directory does not exist yet, e.g. when 
tracking new files
+        return files == null ? Collections.emptyList() : Arrays.asList(files);
+    }
 
-        return getExistingFiles(folder, relativeFilePath);
+    public boolean isFinal()
+    {
+        return type.isFinal();
+    }
+
+    String fileName()
+    {
+        return absolutePath.isPresent() ? 
Paths.get(absolutePath.get()).getFileName().toString() : "";
     }
 
-    public static List<File> getExistingFiles(File parentFolder, String 
relativeFilePath)
+    String absolutePath()
     {
-        return Arrays.asList(parentFolder.listFiles((dir, name) -> 
name.startsWith(relativeFilePath)));
+        return absolutePath.isPresent() ? absolutePath.get() : "";
     }
 
     @Override
     public int hashCode()
     {
         // see comment in equals
-        return Objects.hash(type, relativeFilePath, error);
+        return Objects.hash(type, absolutePath, numFiles, updateTime);
     }
 
     @Override
@@ -178,15 +275,12 @@ final class LogRecord
 
         final LogRecord other = (LogRecord)obj;
 
-        // we exclude on purpose checksum, update time and count as
-        // we don't want duplicated records that differ only by
-        // properties that might change on disk, especially COMMIT records,
-        // there should be only one regardless of update time
-        // however we must compare the error to make sure we have more than
-        // one UNKNOWN record, if we fail to parse more than one
+        // we exclude on purpose checksum, error and full file path
+        // since records must match across log file replicas on different disks
         return type == other.type &&
-               relativeFilePath.equals(other.relativeFilePath) &&
-               error.equals(other.error);
+               absolutePath.equals(other.absolutePath) &&
+               numFiles == other.numFiles &&
+               updateTime == other.updateTime;
     }
 
     @Override
@@ -198,7 +292,7 @@ final class LogRecord
     long computeChecksum()
     {
         CRC32 crc32 = new CRC32();
-        crc32.update(relativeFilePath.getBytes(FileUtils.CHARSET));
+        crc32.update((absolutePath()).getBytes(FileUtils.CHARSET));
         crc32.update(type.toString().getBytes(FileUtils.CHARSET));
         FBUtilities.updateChecksumInt(crc32, (int) updateTime);
         FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
new file mode 100644
index 0000000..79b9749
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CLibrary;
+
+/**
+ * Because a column family may have sstables on different disks and disks can
+ * be removed, we duplicate log files into many replicas so as to have a file
+ * in each folder where sstables exist.
+ *
+ * Each replica contains the exact same content but we do allow for final
+ * partial records in case we crashed after writing to one replica but
+ * before compliting the write to another replica.
+ *
+ * @see LogFile
+ */
+final class LogReplica
+{
+    private final File file;
+    private int folderDescriptor;
+
+    static LogReplica create(File folder, String fileName)
+    {
+        return new LogReplica(new File(fileName), 
CLibrary.tryOpenDirectory(folder.getPath()));
+    }
+
+    static LogReplica open(File file)
+    {
+        return new LogReplica(file, 
CLibrary.tryOpenDirectory(file.getParentFile().getPath()));
+    }
+
+    LogReplica(File file, int folderDescriptor)
+    {
+        this.file = file;
+        this.folderDescriptor = folderDescriptor;
+    }
+
+    File file()
+    {
+        return file;
+    }
+
+    void append(LogRecord record)
+    {
+        boolean existed = exists();
+        FileUtils.appendAndSync(file, record.toString());
+
+        // If the file did not exist before appending the first
+        // line, then sync the folder as well since now it must exist
+        if (!existed)
+            syncFolder();
+    }
+
+    void syncFolder()
+    {
+        if (folderDescriptor >= 0)
+            CLibrary.trySync(folderDescriptor);
+    }
+
+    void delete()
+    {
+        LogTransaction.delete(file);
+        syncFolder();
+    }
+
+    boolean exists()
+    {
+        return file.exists();
+    }
+
+    void close()
+    {
+        if (folderDescriptor >= 0)
+        {
+            CLibrary.tryCloseFD(folderDescriptor);
+            folderDescriptor = -1;
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("[%s] ", file);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
new file mode 100644
index 0000000..d9d9213
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * A set of log replicas. This class mostly iterates over replicas when 
writing or reading,
+ * ensuring consistency among them and hiding replication details from LogFile.
+ *
+ * @see LogReplica, LogFile
+ */
+public class LogReplicaSet
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LogReplicaSet.class);
+
+    private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>();
+
+    private Collection<LogReplica> replicas()
+    {
+        return replicasByFile.values();
+    }
+
+    void addReplicas(List<File> replicas)
+    {
+        replicas.forEach(this::addReplica);
+    }
+
+    void addReplica(File file)
+    {
+        File folder = file.getParentFile();
+        assert !replicasByFile.containsKey(folder);
+        replicasByFile.put(folder, LogReplica.open(file));
+
+        if (logger.isTraceEnabled())
+            logger.trace("Added log file replica {} ", file);
+    }
+
+    void maybeCreateReplica(File folder, String fileName, Set<LogRecord> 
records)
+    {
+        if (replicasByFile.containsKey(folder))
+            return;
+
+        final LogReplica replica = LogReplica.create(folder, fileName);
+
+        records.forEach(replica::append);
+        replicasByFile.put(folder, replica);
+
+        if (logger.isTraceEnabled())
+            logger.trace("Created new file replica {}", replica);
+    }
+
+    Throwable syncFolder(Throwable accumulate)
+    {
+        return Throwables.perform(accumulate, replicas().stream().map(s -> 
s::syncFolder));
+    }
+
+    Throwable delete(Throwable accumulate)
+    {
+        return Throwables.perform(accumulate, replicas().stream().map(s -> 
s::delete));
+    }
+
+    private static boolean isPrefixMatch(String first, String second)
+    {
+        return first.length() >= second.length() ?
+               first.startsWith(second) :
+               second.startsWith(first);
+    }
+
+    boolean readRecords(Set<LogRecord> records)
+    {
+        Map<File, List<String>> linesByReplica = replicas().stream()
+                                                           
.map(LogReplica::file)
+                                                           
.collect(Collectors.toMap(Function.<File>identity(), FileUtils::readLines));
+        int maxNumLines = 
linesByReplica.values().stream().map(List::size).reduce(0, Integer::max);
+        for (int i = 0; i < maxNumLines; i++)
+        {
+            String firstLine = null;
+            boolean partial = false;
+            for (Map.Entry<File, List<String>> entry : 
linesByReplica.entrySet())
+            {
+                List<String> currentLines = entry.getValue();
+                if (i >= currentLines.size())
+                    continue;
+
+                String currentLine = currentLines.get(i);
+                if (firstLine == null)
+                {
+                    firstLine = currentLine;
+                    continue;
+                }
+
+                if (!isPrefixMatch(firstLine, currentLine))
+                { // not a prefix match
+                    logger.error("Mismatched line in file {}: got '{}' 
expected '{}', giving up",
+                                 entry.getKey().getName(),
+                                 currentLine,
+                                 firstLine);
+                    return false;
+                }
+
+                if (!firstLine.equals(currentLine))
+                {
+                    if (i == currentLines.size() - 1)
+                    { // last record, just set record as invalid and move on
+                        logger.warn("Mismatched last line in file {}: '{}' not 
the same as '{}'",
+                                    entry.getKey().getName(),
+                                    currentLine,
+                                    firstLine);
+
+                        if (currentLine.length() > firstLine.length())
+                            firstLine = currentLine;
+
+                        partial = true;
+                    }
+                    else
+                    {   // mismatched entry file has more lines, giving up
+                        logger.error("Mismatched line in file {}: got '{}' 
expected '{}', giving up",
+                                     entry.getKey().getName(),
+                                     currentLine,
+                                     firstLine);
+                        return false;
+                    }
+                }
+            }
+
+            LogRecord record = LogRecord.make(firstLine);
+            if (records.contains(record))
+            { // duplicate records
+                logger.error("Found duplicate record {} for {}, giving up", 
record, record.fileName());
+                return false;
+            }
+
+            if (partial)
+                record.setPartial();
+
+            records.add(record);
+
+            if (record.isFinal() && i != (maxNumLines - 1))
+            { // too many final records
+                logger.error("Found too many lines for {}, giving up", 
record.fileName());
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     *  Add the record to all the replicas: if it is a final record then we 
throw only if we fail to write it
+     *  to all, otherwise we throw if we fail to write it to any file, see 
CASSANDRA-10421 for details
+     */
+    void append(LogRecord record)
+    {
+        Throwable err = Throwables.perform(null, replicas().stream().map(r -> 
() -> r.append(record)));
+        if (err != null)
+        {
+            if (!record.isFinal() || err.getSuppressed().length == 
replicas().size() -1)
+                Throwables.maybeFail(err);
+
+            logger.error("Failed to add record '{}' to some replicas '{}'", 
record, this);
+        }
+    }
+
+    boolean exists()
+    {
+        Optional<Boolean> ret = 
replicas().stream().map(LogReplica::exists).reduce(Boolean::logicalAnd);
+        return ret.isPresent() ?
+               ret.get()
+               : false;
+    }
+
+    void close()
+    {
+        Throwables.maybeFail(Throwables.perform(null, 
replicas().stream().map(r -> r::close)));
+    }
+
+    @Override
+    public String toString()
+    {
+        Optional<String> ret = 
replicas().stream().map(LogReplica::toString).reduce(String::concat);
+        return ret.isPresent() ?
+               ret.get()
+               : "[-]";
+    }
+
+    @VisibleForTesting
+    List<File> getFiles()
+    {
+        return 
replicas().stream().map(LogReplica::file).collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    List<String> getFilePaths()
+    {
+        return 
replicas().stream().map(LogReplica::file).map(File::getPath).collect(Collectors.toList());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 78ea0f1..8b82207 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -92,17 +92,17 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
      */
     public static final class CorruptTransactionLogException extends 
RuntimeException
     {
-        public final LogFile file;
+        public final LogFile txnFile;
 
-        public CorruptTransactionLogException(String message, LogFile file)
+        public CorruptTransactionLogException(String message, LogFile txnFile)
         {
             super(message);
-            this.file = file;
+            this.txnFile = txnFile;
         }
     }
 
     private final Tracker tracker;
-    private final LogFile data;
+    private final LogFile txnFile;
     private final Ref<LogTransaction> selfRef;
     // Deleting sstables is tricky because the mmapping might not have been 
finalized yet,
     // and delete will fail (on Windows) until it is (we only force the 
unmapping on SUN VMs).
@@ -110,30 +110,19 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
     // will be recognized as GCable.
     private static final Queue<Runnable> failedDeletions = new 
ConcurrentLinkedQueue<>();
 
-    LogTransaction(OperationType opType, CFMetaData metadata)
+    LogTransaction(OperationType opType)
     {
-        this(opType, metadata, null);
+        this(opType, null);
     }
 
-    LogTransaction(OperationType opType, CFMetaData metadata, Tracker tracker)
-    {
-        this(opType, new Directories(metadata), tracker);
-    }
-
-    LogTransaction(OperationType opType, Directories directories, Tracker 
tracker)
-    {
-        this(opType, directories.getDirectoryForNewSSTables(), tracker);
-    }
-
-    LogTransaction(OperationType opType, File folder, Tracker tracker)
+    LogTransaction(OperationType opType, Tracker tracker)
     {
         this.tracker = tracker;
-        int folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
-        this.data = new LogFile(opType, folder, folderDescriptor, 
UUIDGen.getTimeUUID());
-        this.selfRef = new Ref<>(this, new TransactionTidier(data, 
folderDescriptor));
+        this.txnFile = new LogFile(opType, UUIDGen.getTimeUUID());
+        this.selfRef = new Ref<>(this, new TransactionTidier(txnFile));
 
         if (logger.isTraceEnabled())
-            logger.trace("Created transaction logs with id {}", data.id);
+            logger.trace("Created transaction logs with id {}", txnFile.id());
     }
 
     /**
@@ -141,7 +130,7 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
      **/
     void trackNew(SSTable table)
     {
-        data.add(Type.ADD, table);
+        txnFile.add(Type.ADD, table);
     }
 
     /**
@@ -149,7 +138,7 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
      */
     void untrackNew(SSTable table)
     {
-        data.remove(Type.ADD, table);
+        txnFile.remove(Type.ADD, table);
     }
 
     /**
@@ -157,15 +146,15 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
      */
     SSTableTidier obsoleted(SSTableReader reader)
     {
-        if (data.contains(Type.ADD, reader))
+        if (txnFile.contains(Type.ADD, reader))
         {
-            if (data.contains(Type.REMOVE, reader))
+            if (txnFile.contains(Type.REMOVE, reader))
                 throw new IllegalArgumentException();
 
             return new SSTableTidier(reader, true, this);
         }
 
-        data.add(Type.REMOVE, reader);
+        txnFile.add(Type.REMOVE, reader);
 
         if (tracker != null)
             tracker.notifyDeleting(reader);
@@ -173,26 +162,32 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
         return new SSTableTidier(reader, false, this);
     }
 
-    OperationType getType()
+    OperationType type()
+    {
+        return txnFile.type();
+    }
+
+    UUID id()
     {
-        return data.getType();
+        return txnFile.id();
     }
 
-    UUID getId()
+    @VisibleForTesting
+    LogFile txnFile()
     {
-        return data.getId();
+        return txnFile;
     }
 
     @VisibleForTesting
-    String getDataFolder()
+    List<File> logFiles()
     {
-        return data.folder.getPath();
+        return txnFile.getFiles();
     }
 
     @VisibleForTesting
-    LogFile getLogFile()
+    List<String> logFilePaths()
     {
-        return data;
+        return txnFile.getFilePaths();
     }
 
     static void delete(File file)
@@ -224,12 +219,10 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
     private static class TransactionTidier implements RefCounted.Tidy, Runnable
     {
         private final LogFile data;
-        private final int folderDescriptor;
 
-        TransactionTidier(LogFile data, int folderDescriptor)
+        TransactionTidier(LogFile data)
         {
             this.data = data;
-            this.folderDescriptor = folderDescriptor;
         }
 
         public void tidy() throws Exception
@@ -247,7 +240,13 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
             if (logger.isTraceEnabled())
                 logger.trace("Removing files for transaction {}", name());
 
-            assert data.completed() : "Expected a completed transaction: " + 
data;
+            if (!data.completed())
+            { // this happens if we forget to close a txn and the garbage 
collector closes it for us
+                logger.error("{} was not completed, trying to abort it now", 
data);
+                Throwable err = Throwables.perform((Throwable)null, 
data::abort);
+                if (err != null)
+                    logger.error("Failed to abort {}", data, err);
+            }
 
             Throwable err = data.removeUnfinishedLeftovers(null);
 
@@ -260,7 +259,8 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
             {
                 if (logger.isTraceEnabled())
                     logger.trace("Closing file transaction {}", name());
-                CLibrary.tryCloseFD(folderDescriptor);
+
+                data.close();
             }
         }
     }
@@ -360,20 +360,20 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
         }
         catch (Throwable t)
         {
-            logger.error("Failed to complete file transaction {}", getId(), t);
+            logger.error("Failed to complete file transaction {}", id(), t);
             return Throwables.merge(accumulate, t);
         }
     }
 
     protected Throwable doCommit(Throwable accumulate)
     {
-        data.commit();
+        txnFile.commit();
         return complete(accumulate);
     }
 
     protected Throwable doAbort(Throwable accumulate)
     {
-        data.abort();
+        txnFile.abort();
         return complete(accumulate);
     }
 
@@ -387,32 +387,62 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
      */
     static void removeUnfinishedLeftovers(CFMetaData metadata)
     {
-        for (File dir : new Directories(metadata).getCFDirectories())
+        removeUnfinishedLeftovers(new 
Directories(metadata).getCFDirectories());
+    }
+
+    @VisibleForTesting
+    static void removeUnfinishedLeftovers(List<File> folders)
+    {
+        LogFilesByName logFiles = new LogFilesByName();
+        folders.forEach(logFiles::list);
+        logFiles.removeUnfinishedLeftovers();
+    }
+
+    private static final class LogFilesByName
+    {
+        Map<String, List<File>> files = new HashMap<>();
+
+        void list(File folder)
         {
-            int folderDescriptor = CLibrary.tryOpenDirectory(dir.getPath());
-            try
+            
Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add);
+        }
+
+        void add(File file)
+        {
+            List<File> filesByName = files.get(file.getName());
+            if (filesByName == null)
             {
-                File[] logs = dir.listFiles(LogFile::isLogFile);
+                filesByName = new ArrayList<>();
+                files.put(file.getName(), filesByName);
+            }
+
+            filesByName.add(file);
+        }
 
-                for (File log : logs)
+        void removeUnfinishedLeftovers()
+        {
+            files.forEach(LogFilesByName::removeUnfinishedLeftovers);
+        }
+
+        static void removeUnfinishedLeftovers(String name, List<File> logFiles)
+        {
+            LogFile txn = LogFile.make(name, logFiles);
+            try
+            {
+                if (txn.verify())
+                {
+                    Throwable failure = txn.removeUnfinishedLeftovers(null);
+                    if (failure != null)
+                        logger.error("Failed to remove unfinished transaction 
leftovers for txn {}", txn, failure);
+                }
+                else
                 {
-                    LogFile data = LogFile.make(log, folderDescriptor);
-                    data.readRecords();
-                    if (data.verify())
-                    {
-                        Throwable failure = 
data.removeUnfinishedLeftovers(null);
-                        if (failure != null)
-                            logger.error("Failed to remove unfinished 
transaction leftovers for log {}", log, failure);
-                    }
-                    else
-                    {
-                        logger.error("Unexpected disk state: failed to read 
transaction log {}", log);
-                    }
+                    logger.error("Unexpected disk state: failed to read 
transaction txn {}", txn);
                 }
             }
             finally
             {
-                CLibrary.tryCloseFD(folderDescriptor);
+                txn.close();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 59c15bb..c09d49c 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -224,7 +224,7 @@ public class Tracker
      */
     public Throwable dropSSTables(final Predicate<SSTableReader> remove, 
OperationType operationType, Throwable accumulate)
     {
-        try (LogTransaction txnLogs = new LogTransaction(operationType, 
cfstore.metadata, this))
+        try (LogTransaction txnLogs = new LogTransaction(operationType, this))
         {
             Pair<View, View> result = apply(view -> {
                 Set<SSTableReader> toremove = copyOf(filter(view.sstables, 
and(remove, notIn(view.compacting))));
@@ -247,7 +247,7 @@ public class Tracker
                     accumulate = updateSizeTracking(removed, emptySet(), 
accumulate);
                     accumulate = release(selfRefs(removed), accumulate);
                     // notifySSTablesChanged -> LeveledManifest.promote 
doesn't like a no-op "promotion"
-                    accumulate = notifySSTablesChanged(removed, 
Collections.<SSTableReader>emptySet(), txnLogs.getType(), accumulate);
+                    accumulate = notifySSTablesChanged(removed, 
Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate);
                 }
             }
             catch (Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 9ad5a80..e889d85 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -96,7 +96,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
     @SuppressWarnings("resource") // log and writer closed during postCleanup
     public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor 
descriptor, long keyCount, long repairedAt, int sstableLevel, 
SerializationHeader header)
     {
-        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
         SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, sstableLevel, header, txn);
         return new SSTableTxnWriter(txn, writer);
     }
@@ -105,7 +105,7 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
     public static SSTableTxnWriter create(CFMetaData cfm, Descriptor 
descriptor, long keyCount, long repairedAt, int sstableLevel, 
SerializationHeader header)
     {
         // if the column family store does not exist, we create a new default 
SSTableMultiWriter to use:
-        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
         MetadataCollector collector = new 
MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
         SSTableMultiWriter writer = 
SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, 
collector, header, txn);
         return new SSTableTxnWriter(txn, writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java 
b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 5bcd34a..46f2de5 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -575,24 +575,32 @@ public class FileUtils
     public static void append(File file, String ... lines)
     {
         if (file.exists())
-            write(file, StandardOpenOption.APPEND, lines);
+            write(file, Arrays.asList(lines), StandardOpenOption.APPEND);
         else
-            write(file, StandardOpenOption.CREATE, lines);
+            write(file, Arrays.asList(lines), StandardOpenOption.CREATE);
+    }
+
+    public static void appendAndSync(File file, String ... lines)
+    {
+        if (file.exists())
+            write(file, Arrays.asList(lines), StandardOpenOption.APPEND, 
StandardOpenOption.SYNC);
+        else
+            write(file, Arrays.asList(lines), StandardOpenOption.CREATE, 
StandardOpenOption.SYNC);
     }
 
     public static void replace(File file, String ... lines)
     {
-        write(file, StandardOpenOption.TRUNCATE_EXISTING, lines);
+        write(file, Arrays.asList(lines), 
StandardOpenOption.TRUNCATE_EXISTING);
     }
 
-    public static void write(File file, StandardOpenOption op, String ... 
lines)
+    public static void write(File file, List<String> lines, StandardOpenOption 
... options)
     {
         try
         {
             Files.write(file.toPath(),
-                        Arrays.asList(lines),
+                        lines,
                         CHARSET,
-                        op);
+                        options);
         }
         catch (IOException ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index f261954..0b864fa 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask
         this.totalSize = totalSize;
         // this is an "offline" transaction, as we currently manually expose 
the sstables once done;
         // this should be revisited at a later date, so that 
LifecycleTransaction manages all sstable state changes
-        this.txn = LifecycleTransaction.offline(OperationType.STREAM, 
Schema.instance.getCFMetaData(cfId));
+        this.txn = LifecycleTransaction.offline(OperationType.STREAM);
         this.sstables = new ArrayList<>(totalFiles);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java 
b/src/java/org/apache/cassandra/utils/Throwables.java
index 8c5e3ec..923b723 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -87,6 +87,11 @@ public final class Throwables
             throw (E) fail;
     }
 
+    public static Throwable perform(Throwable accumulate, DiscreteAction<?> 
... actions)
+    {
+        return perform(accumulate, Arrays.stream(actions));
+    }
+
     public static Throwable perform(Throwable accumulate, Stream<? extends 
DiscreteAction<?>> actions)
     {
         return perform(accumulate, actions.iterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java 
b/test/unit/org/apache/cassandra/db/ScrubTest.java
index a553fe8..d5baec8 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -326,7 +326,7 @@ public class ScrubTest
             String filename = cfs.getSSTablePath(tempDataDir);
             Descriptor desc = Descriptor.fromFilename(filename);
 
-            LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE, desc.directory);
+            LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
             try (SSTableTxnWriter writer = new SSTableTxnWriter(txn, 
createTestWriter(desc, (long) keys.size(), cfs.metadata, txn)))
             {
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 0488245..3549523 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -159,7 +159,7 @@ public class HelpersTest
     public void testMarkObsolete()
     {
         ColumnFamilyStore cfs = MockSchema.newCFS();
-        LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN, 
cfs.metadata);
+        LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
         Iterable<SSTableReader> readers = 
Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
 
         List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();

Reply via email to