http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index a655fd8..df05d71 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.lifecycle;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.file.Files;
 import java.util.*;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -80,6 +81,7 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         {
             final ColumnFamilyStore cfs;
             final LogTransaction txnLogs;
+            final File dataFolder;
             final SSTableReader sstableOld;
             final SSTableReader sstableNew;
             final LogTransaction.SSTableTidier tidier;
@@ -88,12 +90,13 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
             {
                 this.cfs = cfs;
                 this.txnLogs = txnLogs;
-                this.sstableOld = sstable(cfs, 0, 128);
-                this.sstableNew = sstable(cfs, 1, 128);
+                this.dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+                this.sstableOld = sstable(dataFolder, cfs, 0, 128);
+                this.sstableNew = sstable(dataFolder, cfs, 1, 128);
 
                 assertNotNull(txnLogs);
-                assertNotNull(txnLogs.getId());
-                Assert.assertEquals(OperationType.COMPACTION, 
txnLogs.getType());
+                assertNotNull(txnLogs.id());
+                Assert.assertEquals(OperationType.COMPACTION, txnLogs.type());
 
                 txnLogs.trackNew(sstableNew);
                 tidier = txnLogs.obsoleted(sstableOld);
@@ -131,9 +134,9 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
             void assertInProgress() throws Exception
             {
-                assertFiles(txnLogs.getDataFolder(), 
Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
-                                                                               
       sstableOld.getAllFilePaths(),
-                                                                               
       Collections.singleton(txnLogs.getLogFile().file.getPath()))));
+                assertFiles(dataFolder.getPath(), 
Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
+                                                                               
    sstableOld.getAllFilePaths(),
+                                                                               
    txnLogs.logFilePaths())));
             }
 
             void assertPrepared() throws Exception
@@ -142,12 +145,12 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
             void assertAborted() throws Exception
             {
-                assertFiles(txnLogs.getDataFolder(), new 
HashSet<>(sstableOld.getAllFilePaths()));
+                assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableOld.getAllFilePaths()));
             }
 
             void assertCommitted() throws Exception
             {
-                assertFiles(txnLogs.getDataFolder(), new 
HashSet<>(sstableNew.getAllFilePaths()));
+                assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableNew.getAllFilePaths()));
             }
         }
 
@@ -160,7 +163,7 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
         private TxnTest(ColumnFamilyStore cfs) throws IOException
         {
-            this(cfs, new LogTransaction(OperationType.COMPACTION, 
cfs.metadata));
+            this(cfs, new LogTransaction(OperationType.COMPACTION));
         }
 
         private TxnTest(ColumnFamilyStore cfs, LogTransaction txnLogs) throws 
IOException
@@ -199,10 +202,11 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     public void testUntrack() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstableNew = sstable(cfs, 1, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 
         // complete a transaction without keep the new files since they were 
untracked
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstableNew);
@@ -214,18 +218,19 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         Thread.sleep(1);
         LogTransaction.waitForDeletions();
 
-        assertFiles(log.getDataFolder(), Collections.<String>emptySet());
+        assertFiles(dataFolder.getPath(), Collections.<String>emptySet());
     }
 
     @Test
     public void testCommitSameDesc() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstableOld1 = sstable(cfs, 0, 128);
-        SSTableReader sstableOld2 = sstable(cfs, 0, 256);
-        SSTableReader sstableNew = sstable(cfs, 1, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstableOld1 = sstable(dataFolder, cfs, 0, 128);
+        SSTableReader sstableOld2 = sstable(dataFolder, cfs, 0, 256);
+        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstableNew);
@@ -242,7 +247,7 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         sstableOld1.selfRef().release();
         sstableOld2.selfRef().release();
 
-        assertFiles(log.getDataFolder(), new 
HashSet<>(sstableNew.getAllFilePaths()));
+        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableNew.getAllFilePaths()));
 
         sstableNew.selfRef().release();
     }
@@ -251,15 +256,16 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     public void testCommitOnlyNew() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable = sstable(cfs, 0, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstable);
         log.finish();
 
-        assertFiles(log.getDataFolder(), new 
HashSet<>(sstable.getAllFilePaths()));
+        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstable.getAllFilePaths()));
 
         sstable.selfRef().release();
     }
@@ -268,9 +274,10 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     public void testCommitOnlyOld() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable = sstable(cfs, 0, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         LogTransaction.SSTableTidier tidier = log.obsoleted(sstable);
@@ -280,16 +287,54 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         sstable.markObsolete(tidier);
         sstable.selfRef().release();
 
-        assertFiles(log.getDataFolder(), new HashSet<>());
+        assertFiles(dataFolder.getPath(), new HashSet<>());
+    }
+
+    @Test
+    public void testCommitMultipleFolders() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+
+        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        File dataFolder1 = new File(origiFolder, "1");
+        File dataFolder2 = new File(origiFolder, "2");
+        Files.createDirectories(dataFolder1.toPath());
+        Files.createDirectories(dataFolder2.toPath());
+
+        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
+                                     sstable(dataFolder1, cfs, 1, 128),
+                                     sstable(dataFolder2, cfs, 2, 128),
+                                     sstable(dataFolder2, cfs, 3, 128)
+        };
+
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
+        assertNotNull(log);
+
+        LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), 
log.obsoleted(sstables[2]) };
+
+        log.trackNew(sstables[1]);
+        log.trackNew(sstables[3]);
+
+        log.finish();
+
+        sstables[0].markObsolete(tidiers[0]);
+        sstables[2].markObsolete(tidiers[1]);
+
+        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
+        LogTransaction.waitForDeletions();
+
+        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[1].getAllFilePaths()));
+        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[3].getAllFilePaths()));
     }
 
     @Test
     public void testAbortOnlyNew() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable = sstable(cfs, 0, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstable);
@@ -297,16 +342,17 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
         sstable.selfRef().release();
 
-        assertFiles(log.getDataFolder(), new HashSet<>());
+        assertFiles(dataFolder.getPath(), new HashSet<>());
     }
 
     @Test
     public void testAbortOnlyOld() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable = sstable(cfs, 0, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         LogTransaction.SSTableTidier tidier = log.obsoleted(sstable);
@@ -317,18 +363,55 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
         sstable.selfRef().release();
 
-        assertFiles(log.getDataFolder(), new 
HashSet<>(sstable.getAllFilePaths()));
+        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstable.getAllFilePaths()));
+    }
+
+    @Test
+    public void testAbortMultipleFolders() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+
+        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        File dataFolder1 = new File(origiFolder, "1");
+        File dataFolder2 = new File(origiFolder, "2");
+        Files.createDirectories(dataFolder1.toPath());
+        Files.createDirectories(dataFolder2.toPath());
+
+        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
+                                     sstable(dataFolder1, cfs, 1, 128),
+                                     sstable(dataFolder2, cfs, 2, 128),
+                                     sstable(dataFolder2, cfs, 3, 128)
+        };
+
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
+        assertNotNull(log);
+
+        LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), 
log.obsoleted(sstables[2]) };
+
+        log.trackNew(sstables[1]);
+        log.trackNew(sstables[3]);
+
+        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::abort);
+        log.abort();
+
+        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
+        LogTransaction.waitForDeletions();
+
+        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[0].getAllFilePaths()));
+        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[2].getAllFilePaths()));
     }
 
+
     @Test
     public void testRemoveUnfinishedLeftovers_abort() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstableOld = sstable(cfs, 0, 128);
-        SSTableReader sstableNew = sstable(cfs, 1, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
+        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 
         // simulate tracking sstables with a failed transaction (new log file 
NOT deleted)
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstableNew);
@@ -349,11 +432,10 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         Map<Descriptor, Set<Component>> sstables = 
directories.sstableLister(Directories.OnTxnErr.THROW).list();
         assertEquals(1, sstables.size());
 
-        assertFiles(log.getDataFolder(), new 
HashSet<>(sstableOld.getAllFilePaths()));
-
-        tidier.run();
+        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableOld.getAllFilePaths()));
 
         // complete the transaction before releasing files
+        tidier.run();
         log.close();
     }
 
@@ -361,18 +443,19 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     public void testRemoveUnfinishedLeftovers_commit() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstableOld = sstable(cfs, 0, 128);
-        SSTableReader sstableNew = sstable(cfs, 1, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
+        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 
         // simulate tracking sstables with a committed transaction (new log 
file deleted)
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstableNew);
         LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld);
 
         //Fake a commit
-        log.getLogFile().commit();
+        log.txnFile().commit();
 
         Set<File> tmpFiles = 
sstableOld.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet());
 
@@ -389,33 +472,314 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         Map<Descriptor, Set<Component>> sstables = 
directories.sstableLister(Directories.OnTxnErr.THROW).list();
         assertEquals(1, sstables.size());
 
-        assertFiles(log.getDataFolder(), new 
HashSet<>(sstableNew.getAllFilePaths()));
+        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableNew.getAllFilePaths()));
 
+        // complete the transaction to avoid LEAK errors
         tidier.run();
+        assertNull(log.complete(null));
+    }
+
+    @Test
+    public void testRemoveUnfinishedLeftovers_commit_multipleFolders() throws 
Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+
+        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        File dataFolder1 = new File(origiFolder, "1");
+        File dataFolder2 = new File(origiFolder, "2");
+        Files.createDirectories(dataFolder1.toPath());
+        Files.createDirectories(dataFolder2.toPath());
+
+        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
+                                     sstable(dataFolder1, cfs, 1, 128),
+                                     sstable(dataFolder2, cfs, 2, 128),
+                                     sstable(dataFolder2, cfs, 3, 128)
+        };
+
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
+        assertNotNull(log);
+
+        LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), 
log.obsoleted(sstables[2]) };
+
+        log.trackNew(sstables[1]);
+        log.trackNew(sstables[3]);
+
+        Collection<File> logFiles = log.logFiles();
+        Assert.assertEquals(2, logFiles.size());
+
+        // fake a commit
+        log.txnFile().commit();
+
+        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
+
+        // test listing
+        
Assert.assertEquals(sstables[0].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
+                            LogAwareFileLister.getTemporaryFiles(dataFolder1));
+        
Assert.assertEquals(sstables[2].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
+                            LogAwareFileLister.getTemporaryFiles(dataFolder2));
+
+        // normally called at startup
+        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, 
dataFolder2));
+
+        // new tables should be only table left
+        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[1].getAllFilePaths()));
+        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[3].getAllFilePaths()));
 
         // complete the transaction to avoid LEAK errors
+        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run);
         assertNull(log.complete(null));
     }
 
     @Test
-    public void testGetTemporaryFiles() throws IOException
+    public void testRemoveUnfinishedLeftovers_abort_multipleFolders() throws 
Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+
+        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        File dataFolder1 = new File(origiFolder, "1");
+        File dataFolder2 = new File(origiFolder, "2");
+        Files.createDirectories(dataFolder1.toPath());
+        Files.createDirectories(dataFolder2.toPath());
+
+        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
+                                     sstable(dataFolder1, cfs, 1, 128),
+                                     sstable(dataFolder2, cfs, 2, 128),
+                                     sstable(dataFolder2, cfs, 3, 128)
+        };
+
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
+        assertNotNull(log);
+
+        LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), 
log.obsoleted(sstables[2]) };
+
+        log.trackNew(sstables[1]);
+        log.trackNew(sstables[3]);
+
+        Collection<File> logFiles = log.logFiles();
+        Assert.assertEquals(2, logFiles.size());
+
+        // fake an abort
+        log.txnFile().abort();
+
+        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
+
+        // test listing
+        
Assert.assertEquals(sstables[1].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
+                            LogAwareFileLister.getTemporaryFiles(dataFolder1));
+        
Assert.assertEquals(sstables[3].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
+                            LogAwareFileLister.getTemporaryFiles(dataFolder2));
+
+        // normally called at startup
+        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, 
dataFolder2));
+
+        // old tables should be only table left
+        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[0].getAllFilePaths()));
+        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[2].getAllFilePaths()));
+
+        // complete the transaction to avoid LEAK errors
+        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run);
+        assertNull(log.complete(null));
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_mismatchedFinalRecords() throws 
Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert mismatched records
+            FileUtils.append(logFiles.get(0), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
+            FileUtils.append(logFiles.get(1), 
LogRecord.makeAbort(System.currentTimeMillis()).raw);
+
+        }, false);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_first() 
throws Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert a full record and a partial one
+            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
+            int toChop = finalRecord.length() / 2;
+            FileUtils.append(logFiles.get(0), finalRecord.substring(0, 
finalRecord.length() - toChop));
+            FileUtils.append(logFiles.get(1), finalRecord);
+
+        }, true);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_second() 
throws Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert a full record and a partial one
+            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
+            int toChop = finalRecord.length() / 2;
+            FileUtils.append(logFiles.get(0), finalRecord);
+            FileUtils.append(logFiles.get(1), finalRecord.substring(0, 
finalRecord.length() - toChop));
+
+        }, true);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_first() 
throws Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert a partial sstable record and a full commit record
+            String sstableRecord = LogRecord.make(LogRecord.Type.ADD, 
Collections.emptyList(), 0, "abc").raw;
+            int toChop = sstableRecord.length() / 2;
+            FileUtils.append(logFiles.get(0), sstableRecord.substring(0, 
sstableRecord.length() - toChop));
+            FileUtils.append(logFiles.get(1), sstableRecord);
+            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
+            FileUtils.append(logFiles.get(0), finalRecord);
+            FileUtils.append(logFiles.get(1), finalRecord);
+
+        }, false);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_second() 
throws Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert a partial sstable record and a full commit record
+            String sstableRecord = LogRecord.make(LogRecord.Type.ADD, 
Collections.emptyList(), 0, "abc").raw;
+            int toChop = sstableRecord.length() / 2;
+            FileUtils.append(logFiles.get(0), sstableRecord);
+            FileUtils.append(logFiles.get(1), sstableRecord.substring(0, 
sstableRecord.length() - toChop));
+            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
+            FileUtils.append(logFiles.get(0), finalRecord);
+            FileUtils.append(logFiles.get(1), finalRecord);
+
+        }, false);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_first() 
throws Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert only one commit record
+            FileUtils.append(logFiles.get(0), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
+
+        }, true);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_second() 
throws Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert only one commit record
+            FileUtils.append(logFiles.get(1), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
+
+        }, true);
+    }
+
+    @Test
+    public void 
testRemoveUnfinishedLeftovers_multipleFolders_tooManyFinalRecords() throws 
Throwable
+    {
+        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
+            List<File> logFiles = txn.logFiles();
+            Assert.assertEquals(2, logFiles.size());
+
+            // insert mismatched records
+            FileUtils.append(logFiles.get(0), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
+            FileUtils.append(logFiles.get(1), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
+            FileUtils.append(logFiles.get(1), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
+
+        }, false);
+    }
+
+    private static void 
testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(Consumer<LogTransaction>
 modifier, boolean shouldCommit) throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable1 = sstable(cfs, 0, 128);
 
-        File dataFolder = sstable1.descriptor.directory;
+        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        File dataFolder1 = new File(origiFolder, "1");
+        File dataFolder2 = new File(origiFolder, "2");
+        Files.createDirectories(dataFolder1.toPath());
+        Files.createDirectories(dataFolder2.toPath());
+
+        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
+                                     sstable(dataFolder1, cfs, 1, 128),
+                                     sstable(dataFolder2, cfs, 2, 128),
+                                     sstable(dataFolder2, cfs, 3, 128)
+        };
+
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
+        assertNotNull(log);
+
+        LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), 
log.obsoleted(sstables[2]) };
+
+        log.trackNew(sstables[1]);
+        log.trackNew(sstables[3]);
+
+        // fake some error condition on the txn logs
+        modifier.accept(log);
+
+        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
+
+        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, 
dataFolder2));
+        LogTransaction.waitForDeletions();
+
+        if (shouldCommit)
+        {
+            // only new sstables should still be there
+            assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[1].getAllFilePaths()));
+            assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[3].getAllFilePaths()));
+        }
+        else
+        {
+            // all files should still be there
+            assertFiles(dataFolder1.getPath(), 
Sets.newHashSet(Iterables.concat(sstables[0].getAllFilePaths(),
+                                                                               
 sstables[1].getAllFilePaths(),
+                                                                               
 Collections.singleton(log.logFilePaths().get(0)))));
+            assertFiles(dataFolder2.getPath(), 
Sets.newHashSet(Iterables.concat(sstables[2].getAllFilePaths(),
+                                                                               
 sstables[3].getAllFilePaths(),
+                                                                               
 Collections.singleton(log.logFilePaths().get(1)))));
+        }
+
+
+        // complete the transaction to avoid LEAK errors
+        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run);
+        log.txnFile().commit(); // just anything to make sure transaction 
tidier will finish
+        assertNull(log.complete(null));
+    }
+
+    @Test
+    public void testGetTemporaryFiles() throws IOException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable1 = sstable(dataFolder, cfs, 0, 128);
 
         Set<File> tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder);
         assertNotNull(tmpFiles);
         assertEquals(0, tmpFiles.size());
 
-        try(LogTransaction log = new LogTransaction(OperationType.WRITE, 
cfs.metadata))
+        try(LogTransaction log = new LogTransaction(OperationType.WRITE))
         {
             Directories directories = new Directories(cfs.metadata);
 
             File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> 
!pathname.isDirectory());
 
-            SSTableReader sstable2 = sstable(cfs, 1, 128);
+            SSTableReader sstable2 = sstable(dataFolder, cfs, 1, 128);
             log.trackNew(sstable2);
 
             Map<Descriptor, Set<Component>> sstables = 
directories.sstableLister(Directories.OnTxnErr.THROW).list();
@@ -470,10 +834,8 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     {
         testCorruptRecord((t, s) ->
                           { // Fake a commit with invalid checksum
-                              FileUtils.append(t.getLogFile().file,
-                                               
String.format("commit:[%d,0,0][%d]",
-                                                             
System.currentTimeMillis(),
-                                                             12345678L));
+                              long now = System.currentTimeMillis();
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0][%d]", now, 12345678L)));
                           },
                           true);
     }
@@ -483,15 +845,9 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     {
         testCorruptRecord((t, s) ->
                           { // Fake two lines with invalid checksum
-                              FileUtils.append(t.getLogFile().file,
-                                               
String.format("add:[ma-3-big,%d,4][%d]",
-                                                             
System.currentTimeMillis(),
-                                                             12345678L));
-
-                              FileUtils.append(t.getLogFile().file,
-                                               
String.format("commit:[%d,0,0][%d]",
-                                                             
System.currentTimeMillis(),
-                                                             12345678L));
+                              long now = System.currentTimeMillis();
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("add:[ma-3-big,%d,4][%d]", now, 12345678L)));
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0][%d]", now, 12345678L)));
                           },
                           false);
     }
@@ -506,15 +862,13 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
                                   if (filePath.endsWith("Data.db"))
                                   {
                                       assertTrue(FileUtils.delete(filePath));
-                                      t.getLogFile().sync();
+                                      assertNull(t.txnFile().syncFolder(null));
                                       break;
                                   }
                               }
 
-                              FileUtils.append(t.getLogFile().file,
-                                               
String.format("commit:[%d,0,0][%d]",
-                                                             
System.currentTimeMillis(),
-                                                             12345678L));
+                              long now = System.currentTimeMillis();
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0][%d]", now, 12345678L)));
                           },
                           false);
     }
@@ -524,10 +878,8 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     {
         testCorruptRecord((t, s) ->
                           { // Fake a commit with invalid checksum and a wrong 
record format (extra spaces)
-                              FileUtils.append(t.getLogFile().file,
-                                               String.format("commit:[%d ,0 ,0 
][%d]",
-                                                             
System.currentTimeMillis(),
-                                                             12345678L));
+                              long now = System.currentTimeMillis();
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d ,0 ,0 ][%d]", now, 12345678L)));
                           },
                           true);
     }
@@ -538,9 +890,8 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         testCorruptRecord((t, s) ->
                           {
                               // Fake a commit without a checksum
-                              FileUtils.append(t.getLogFile().file,
-                                               String.format("commit:[%d,0,0]",
-                                                             
System.currentTimeMillis()));
+                              long now = System.currentTimeMillis();
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0]", now)));
                           },
                           true);
     }
@@ -550,27 +901,38 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     {
         testCorruptRecord((t, s) ->
                           { // Fake two lines without a checksum
-                              FileUtils.append(t.getLogFile().file,
-                                               
String.format("add:[ma-3-big,%d,4]",
-                                                             
System.currentTimeMillis()));
-
-                              FileUtils.append(t.getLogFile().file,
-                                               String.format("commit:[%d,0,0]",
-                                                             
System.currentTimeMillis()));
+                              long now = System.currentTimeMillis();
+                              t.logFiles().forEach( f -> FileUtils.append(f, 
String.format("add:[ma-3-big,%d,4]", now)));
+                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0]", now)));
                           },
                           false);
     }
 
+    @Test
+    public void testUnparsableLastRecord() throws IOException
+    {
+        testCorruptRecord((t, s) -> t.logFiles().forEach(f -> 
FileUtils.append(f, "commit:[a,b,c][12345678]")), true);
+    }
+
+    @Test
+    public void testUnparsableFirstRecord() throws IOException
+    {
+        testCorruptRecord((t, s) -> t.logFiles().forEach(f -> {
+            List<String> lines = FileUtils.readLines(f);
+            lines.add(0, "add:[a,b,c][12345678]");
+            FileUtils.replace(f, lines.toArray(new String[lines.size()]));
+        }), false);
+    }
+
     private static void testCorruptRecord(BiConsumer<LogTransaction, 
SSTableReader> modifier, boolean isRecoverable) throws IOException
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstableOld = sstable(cfs, 0, 128);
-        SSTableReader sstableNew = sstable(cfs, 1, 128);
-
-        File dataFolder = sstableOld.descriptor.directory;
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
+        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 
         // simulate tracking sstables with a committed transaction except the 
checksum will be wrong
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstableNew);
@@ -579,8 +941,6 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         // Modify the transaction log or disk state for sstableOld
         modifier.accept(log, sstableOld);
 
-        String txnFilePath = log.getLogFile().file.getPath();
-
         assertNull(log.complete(null));
 
         sstableOld.selfRef().release();
@@ -611,7 +971,7 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
 
             assertFiles(dataFolder.getPath(), 
Sets.newHashSet(Iterables.concat(newFiles,
                                                                                
oldFiles,
-                                                                               
Collections.singleton(txnFilePath))));
+                                                                               
log.logFilePaths())));
         }
     }
 
@@ -632,11 +992,12 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     private static void testObsoletedFilesChanged(Consumer<SSTableReader> 
modifier) throws IOException
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstableOld = sstable(cfs, 0, 128);
-        SSTableReader sstableNew = sstable(cfs, 1, 128);
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
+        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 
         // simulate tracking sstables with a committed transaction except the 
checksum will be wrong
-        LogTransaction log = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(log);
 
         log.trackNew(sstableNew);
@@ -646,15 +1007,15 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         modifier.accept(sstableOld);
 
         //Fake a commit
-        log.getLogFile().commit();
+        log.txnFile().commit();
 
         //This should not remove the old files
         LogTransaction.removeUnfinishedLeftovers(cfs.metadata);
 
-        assertFiles(log.getDataFolder(), Sets.newHashSet(Iterables.concat(
-                                                                               
     sstableNew.getAllFilePaths(),
-                                                                               
     sstableOld.getAllFilePaths(),
-                                                                               
     Collections.singleton(log.getLogFile().file.getPath()))));
+        assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(
+                                                                          
sstableNew.getAllFilePaths(),
+                                                                          
sstableOld.getAllFilePaths(),
+                                                                          
log.logFilePaths())));
 
         sstableOld.selfRef().release();
         sstableNew.selfRef().release();
@@ -662,20 +1023,19 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         // complete the transaction to avoid LEAK errors
         assertNull(log.complete(null));
 
-        assertFiles(log.getDataFolder(), Sets.newHashSet(Iterables.concat(
-                                                                               
     sstableNew.getAllFilePaths(),
-                                                                               
     sstableOld.getAllFilePaths(),
-                                                                               
     Collections.singleton(log.getLogFile().file.getPath()))));
+        assertFiles(dataFolder.getPath(), 
Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
+                                                                           
sstableOld.getAllFilePaths(),
+                                                                           
log.logFilePaths())));
     }
 
     @Test
     public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable = sstable(cfs, 0, 128);
-        File dataFolder = sstable.descriptor.directory;
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 
-        LogTransaction logs = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction logs = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(logs);
 
         LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable);
@@ -696,10 +1056,10 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
     public void testGetTemporaryFilesThrowsIfCompletingAfterObsoletion() 
throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
-        SSTableReader sstable = sstable(cfs, 0, 128);
-        File dataFolder = sstable.descriptor.directory;
+        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
+        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 
-        LogTransaction logs = new LogTransaction(OperationType.COMPACTION, 
cfs.metadata);
+        LogTransaction logs = new LogTransaction(OperationType.COMPACTION);
         assertNotNull(logs);
 
         LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable);
@@ -725,10 +1085,9 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         logs.finish();
     }
 
-    private static SSTableReader sstable(ColumnFamilyStore cfs, int 
generation, int size) throws IOException
+    private static SSTableReader sstable(File dataFolder, ColumnFamilyStore 
cfs, int generation, int size) throws IOException
     {
-        Directories dir = new Directories(cfs.metadata);
-        Descriptor descriptor = new 
Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), 
cfs.getTableName(), generation);
+        Descriptor descriptor = new Descriptor(dataFolder, 
cfs.keyspace.getName(), cfs.getTableName(), generation);
         Set<Component> components = ImmutableSet.of(Component.DATA, 
Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
         for (Component component : components)
         {
@@ -782,7 +1141,7 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
                     continue;
 
                 String filePath = file.getPath();
-                assertTrue(filePath, expectedFiles.contains(filePath));
+                assertTrue(String.format("%s not in [%s]", filePath, 
expectedFiles), expectedFiles.contains(filePath));
                 expectedFiles.remove(filePath);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index b6cd9a4..4fbbb36 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -86,7 +86,9 @@ public class RealTransactionsTest extends SchemaLoader
         SSTableReader newSSTable = replaceSSTable(cfs, txn, false);
         LogTransaction.waitForDeletions();
 
-        assertFiles(txn.log().getDataFolder(), new 
HashSet<>(newSSTable.getAllFilePaths()));
+        // both sstables are in the same folder
+        assertFiles(oldSSTable.descriptor.directory.getPath(), new 
HashSet<>(newSSTable.getAllFilePaths()));
+        assertFiles(newSSTable.descriptor.directory.getPath(), new 
HashSet<>(newSSTable.getAllFilePaths()));
     }
 
     @Test
@@ -101,7 +103,7 @@ public class RealTransactionsTest extends SchemaLoader
         replaceSSTable(cfs, txn, true);
         LogTransaction.waitForDeletions();
 
-        assertFiles(txn.log().getDataFolder(), new 
HashSet<>(oldSSTable.getAllFilePaths()));
+        assertFiles(oldSSTable.descriptor.directory.getPath(), new 
HashSet<>(oldSSTable.getAllFilePaths()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bd286e4..bfe7b08 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -246,7 +246,7 @@ public class SSTableRewriterTest extends SchemaLoader
         truncate(cfs);
 
         File dir = cfs.getDirectories().getDirectoryForNewSSTables();
-        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata);
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
         try (SSTableWriter writer = getWriter(cfs, dir, txn))
         {
             for (int i = 0; i < 10000; i++)

Reply via email to