Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 1d28a4acf -> 73781a9a4 refs/heads/trunk 87f16ca9a -> 3a2dd0cf6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index a655fd8..df05d71 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.file.Files; import java.util.*; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -80,6 +81,7 @@ public class LogTransactionTest extends AbstractTransactionalTest { final ColumnFamilyStore cfs; final LogTransaction txnLogs; + final File dataFolder; final SSTableReader sstableOld; final SSTableReader sstableNew; final LogTransaction.SSTableTidier tidier; @@ -88,12 +90,13 @@ public class LogTransactionTest extends AbstractTransactionalTest { this.cfs = cfs; this.txnLogs = txnLogs; - this.sstableOld = sstable(cfs, 0, 128); - this.sstableNew = sstable(cfs, 1, 128); + this.dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + this.sstableOld = sstable(dataFolder, cfs, 0, 128); + this.sstableNew = sstable(dataFolder, cfs, 1, 128); assertNotNull(txnLogs); - assertNotNull(txnLogs.getId()); - Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType()); + assertNotNull(txnLogs.id()); + Assert.assertEquals(OperationType.COMPACTION, txnLogs.type()); txnLogs.trackNew(sstableNew); tidier = txnLogs.obsoleted(sstableOld); @@ -131,9 +134,9 @@ public class LogTransactionTest extends AbstractTransactionalTest void assertInProgress() throws Exception { - assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(txnLogs.getLogFile().file.getPath())))); + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + txnLogs.logFilePaths()))); } void assertPrepared() throws Exception @@ -142,12 +145,12 @@ public class LogTransactionTest extends AbstractTransactionalTest void assertAborted() throws Exception { - assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); + assertFiles(dataFolder.getPath(), new HashSet<>(sstableOld.getAllFilePaths())); } void assertCommitted() throws Exception { - assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + assertFiles(dataFolder.getPath(), new HashSet<>(sstableNew.getAllFilePaths())); } } @@ -160,7 +163,7 @@ public class LogTransactionTest extends AbstractTransactionalTest private TxnTest(ColumnFamilyStore cfs) throws IOException { - this(cfs, new LogTransaction(OperationType.COMPACTION, cfs.metadata)); + this(cfs, new LogTransaction(OperationType.COMPACTION)); } private TxnTest(ColumnFamilyStore cfs, LogTransaction txnLogs) throws IOException @@ -199,10 +202,11 @@ public class LogTransactionTest extends AbstractTransactionalTest public void testUntrack() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableNew = sstable(cfs, 1, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); // complete a transaction without keep the new files since they were untracked - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstableNew); @@ -214,18 +218,19 @@ public class LogTransactionTest extends AbstractTransactionalTest Thread.sleep(1); LogTransaction.waitForDeletions(); - assertFiles(log.getDataFolder(), Collections.<String>emptySet()); + assertFiles(dataFolder.getPath(), Collections.<String>emptySet()); } @Test public void testCommitSameDesc() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld1 = sstable(cfs, 0, 128); - SSTableReader sstableOld2 = sstable(cfs, 0, 256); - SSTableReader sstableNew = sstable(cfs, 1, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld1 = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableOld2 = sstable(dataFolder, cfs, 0, 256); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstableNew); @@ -242,7 +247,7 @@ public class LogTransactionTest extends AbstractTransactionalTest sstableOld1.selfRef().release(); sstableOld2.selfRef().release(); - assertFiles(log.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + assertFiles(dataFolder.getPath(), new HashSet<>(sstableNew.getAllFilePaths())); sstableNew.selfRef().release(); } @@ -251,15 +256,16 @@ public class LogTransactionTest extends AbstractTransactionalTest public void testCommitOnlyNew() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstable); log.finish(); - assertFiles(log.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); + assertFiles(dataFolder.getPath(), new HashSet<>(sstable.getAllFilePaths())); sstable.selfRef().release(); } @@ -268,9 +274,10 @@ public class LogTransactionTest extends AbstractTransactionalTest public void testCommitOnlyOld() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); LogTransaction.SSTableTidier tidier = log.obsoleted(sstable); @@ -280,16 +287,54 @@ public class LogTransactionTest extends AbstractTransactionalTest sstable.markObsolete(tidier); sstable.selfRef().release(); - assertFiles(log.getDataFolder(), new HashSet<>()); + assertFiles(dataFolder.getPath(), new HashSet<>()); + } + + @Test + public void testCommitMultipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + log.finish(); + + sstables[0].markObsolete(tidiers[0]); + sstables[2].markObsolete(tidiers[1]); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + LogTransaction.waitForDeletions(); + + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[3].getAllFilePaths())); } @Test public void testAbortOnlyNew() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstable); @@ -297,16 +342,17 @@ public class LogTransactionTest extends AbstractTransactionalTest sstable.selfRef().release(); - assertFiles(log.getDataFolder(), new HashSet<>()); + assertFiles(dataFolder.getPath(), new HashSet<>()); } @Test public void testAbortOnlyOld() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); LogTransaction.SSTableTidier tidier = log.obsoleted(sstable); @@ -317,18 +363,55 @@ public class LogTransactionTest extends AbstractTransactionalTest sstable.selfRef().release(); - assertFiles(log.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); + assertFiles(dataFolder.getPath(), new HashSet<>(sstable.getAllFilePaths())); + } + + @Test + public void testAbortMultipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::abort); + log.abort(); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + LogTransaction.waitForDeletions(); + + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[0].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[2].getAllFilePaths())); } + @Test public void testRemoveUnfinishedLeftovers_abort() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); // simulate tracking sstables with a failed transaction (new log file NOT deleted) - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstableNew); @@ -349,11 +432,10 @@ public class LogTransactionTest extends AbstractTransactionalTest Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); assertEquals(1, sstables.size()); - assertFiles(log.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); - - tidier.run(); + assertFiles(dataFolder.getPath(), new HashSet<>(sstableOld.getAllFilePaths())); // complete the transaction before releasing files + tidier.run(); log.close(); } @@ -361,18 +443,19 @@ public class LogTransactionTest extends AbstractTransactionalTest public void testRemoveUnfinishedLeftovers_commit() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); // simulate tracking sstables with a committed transaction (new log file deleted) - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstableNew); LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld); //Fake a commit - log.getLogFile().commit(); + log.txnFile().commit(); Set<File> tmpFiles = sstableOld.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()); @@ -389,33 +472,314 @@ public class LogTransactionTest extends AbstractTransactionalTest Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); assertEquals(1, sstables.size()); - assertFiles(log.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + assertFiles(dataFolder.getPath(), new HashSet<>(sstableNew.getAllFilePaths())); + // complete the transaction to avoid LEAK errors tidier.run(); + assertNull(log.complete(null)); + } + + @Test + public void testRemoveUnfinishedLeftovers_commit_multipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + Collection<File> logFiles = log.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // fake a commit + log.txnFile().commit(); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + + // test listing + Assert.assertEquals(sstables[0].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), + LogAwareFileLister.getTemporaryFiles(dataFolder1)); + Assert.assertEquals(sstables[2].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), + LogAwareFileLister.getTemporaryFiles(dataFolder2)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)); + + // new tables should be only table left + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[3].getAllFilePaths())); // complete the transaction to avoid LEAK errors + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run); assertNull(log.complete(null)); } @Test - public void testGetTemporaryFiles() throws IOException + public void testRemoveUnfinishedLeftovers_abort_multipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + Collection<File> logFiles = log.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // fake an abort + log.txnFile().abort(); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + + // test listing + Assert.assertEquals(sstables[1].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), + LogAwareFileLister.getTemporaryFiles(dataFolder1)); + Assert.assertEquals(sstables[3].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), + LogAwareFileLister.getTemporaryFiles(dataFolder2)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)); + + // old tables should be only table left + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[0].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[2].getAllFilePaths())); + + // complete the transaction to avoid LEAK errors + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run); + assertNull(log.complete(null)); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_mismatchedFinalRecords() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert mismatched records + FileUtils.append(logFiles.get(0), LogRecord.makeCommit(System.currentTimeMillis()).raw); + FileUtils.append(logFiles.get(1), LogRecord.makeAbort(System.currentTimeMillis()).raw); + + }, false); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_first() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a full record and a partial one + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + int toChop = finalRecord.length() / 2; + FileUtils.append(logFiles.get(0), finalRecord.substring(0, finalRecord.length() - toChop)); + FileUtils.append(logFiles.get(1), finalRecord); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_second() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a full record and a partial one + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + int toChop = finalRecord.length() / 2; + FileUtils.append(logFiles.get(0), finalRecord); + FileUtils.append(logFiles.get(1), finalRecord.substring(0, finalRecord.length() - toChop)); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_first() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a partial sstable record and a full commit record + String sstableRecord = LogRecord.make(LogRecord.Type.ADD, Collections.emptyList(), 0, "abc").raw; + int toChop = sstableRecord.length() / 2; + FileUtils.append(logFiles.get(0), sstableRecord.substring(0, sstableRecord.length() - toChop)); + FileUtils.append(logFiles.get(1), sstableRecord); + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + FileUtils.append(logFiles.get(0), finalRecord); + FileUtils.append(logFiles.get(1), finalRecord); + + }, false); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_second() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a partial sstable record and a full commit record + String sstableRecord = LogRecord.make(LogRecord.Type.ADD, Collections.emptyList(), 0, "abc").raw; + int toChop = sstableRecord.length() / 2; + FileUtils.append(logFiles.get(0), sstableRecord); + FileUtils.append(logFiles.get(1), sstableRecord.substring(0, sstableRecord.length() - toChop)); + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + FileUtils.append(logFiles.get(0), finalRecord); + FileUtils.append(logFiles.get(1), finalRecord); + + }, false); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_first() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert only one commit record + FileUtils.append(logFiles.get(0), LogRecord.makeCommit(System.currentTimeMillis()).raw); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_second() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert only one commit record + FileUtils.append(logFiles.get(1), LogRecord.makeCommit(System.currentTimeMillis()).raw); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_tooManyFinalRecords() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert mismatched records + FileUtils.append(logFiles.get(0), LogRecord.makeCommit(System.currentTimeMillis()).raw); + FileUtils.append(logFiles.get(1), LogRecord.makeCommit(System.currentTimeMillis()).raw); + FileUtils.append(logFiles.get(1), LogRecord.makeCommit(System.currentTimeMillis()).raw); + + }, false); + } + + private static void testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(Consumer<LogTransaction> modifier, boolean shouldCommit) throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable1 = sstable(cfs, 0, 128); - File dataFolder = sstable1.descriptor.directory; + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + // fake some error condition on the txn logs + modifier.accept(log); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + + LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)); + LogTransaction.waitForDeletions(); + + if (shouldCommit) + { + // only new sstables should still be there + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[3].getAllFilePaths())); + } + else + { + // all files should still be there + assertFiles(dataFolder1.getPath(), Sets.newHashSet(Iterables.concat(sstables[0].getAllFilePaths(), + sstables[1].getAllFilePaths(), + Collections.singleton(log.logFilePaths().get(0))))); + assertFiles(dataFolder2.getPath(), Sets.newHashSet(Iterables.concat(sstables[2].getAllFilePaths(), + sstables[3].getAllFilePaths(), + Collections.singleton(log.logFilePaths().get(1))))); + } + + + // complete the transaction to avoid LEAK errors + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run); + log.txnFile().commit(); // just anything to make sure transaction tidier will finish + assertNull(log.complete(null)); + } + + @Test + public void testGetTemporaryFiles() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable1 = sstable(dataFolder, cfs, 0, 128); Set<File> tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); assertNotNull(tmpFiles); assertEquals(0, tmpFiles.size()); - try(LogTransaction log = new LogTransaction(OperationType.WRITE, cfs.metadata)) + try(LogTransaction log = new LogTransaction(OperationType.WRITE)) { Directories directories = new Directories(cfs.metadata); File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); - SSTableReader sstable2 = sstable(cfs, 1, 128); + SSTableReader sstable2 = sstable(dataFolder, cfs, 1, 128); log.trackNew(sstable2); Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); @@ -470,10 +834,8 @@ public class LogTransactionTest extends AbstractTransactionalTest { testCorruptRecord((t, s) -> { // Fake a commit with invalid checksum - FileUtils.append(t.getLogFile().file, - String.format("commit:[%d,0,0][%d]", - System.currentTimeMillis(), - 12345678L)); + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0][%d]", now, 12345678L))); }, true); } @@ -483,15 +845,9 @@ public class LogTransactionTest extends AbstractTransactionalTest { testCorruptRecord((t, s) -> { // Fake two lines with invalid checksum - FileUtils.append(t.getLogFile().file, - String.format("add:[ma-3-big,%d,4][%d]", - System.currentTimeMillis(), - 12345678L)); - - FileUtils.append(t.getLogFile().file, - String.format("commit:[%d,0,0][%d]", - System.currentTimeMillis(), - 12345678L)); + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("add:[ma-3-big,%d,4][%d]", now, 12345678L))); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0][%d]", now, 12345678L))); }, false); } @@ -506,15 +862,13 @@ public class LogTransactionTest extends AbstractTransactionalTest if (filePath.endsWith("Data.db")) { assertTrue(FileUtils.delete(filePath)); - t.getLogFile().sync(); + assertNull(t.txnFile().syncFolder(null)); break; } } - FileUtils.append(t.getLogFile().file, - String.format("commit:[%d,0,0][%d]", - System.currentTimeMillis(), - 12345678L)); + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0][%d]", now, 12345678L))); }, false); } @@ -524,10 +878,8 @@ public class LogTransactionTest extends AbstractTransactionalTest { testCorruptRecord((t, s) -> { // Fake a commit with invalid checksum and a wrong record format (extra spaces) - FileUtils.append(t.getLogFile().file, - String.format("commit:[%d ,0 ,0 ][%d]", - System.currentTimeMillis(), - 12345678L)); + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d ,0 ,0 ][%d]", now, 12345678L))); }, true); } @@ -538,9 +890,8 @@ public class LogTransactionTest extends AbstractTransactionalTest testCorruptRecord((t, s) -> { // Fake a commit without a checksum - FileUtils.append(t.getLogFile().file, - String.format("commit:[%d,0,0]", - System.currentTimeMillis())); + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0]", now))); }, true); } @@ -550,27 +901,38 @@ public class LogTransactionTest extends AbstractTransactionalTest { testCorruptRecord((t, s) -> { // Fake two lines without a checksum - FileUtils.append(t.getLogFile().file, - String.format("add:[ma-3-big,%d,4]", - System.currentTimeMillis())); - - FileUtils.append(t.getLogFile().file, - String.format("commit:[%d,0,0]", - System.currentTimeMillis())); + long now = System.currentTimeMillis(); + t.logFiles().forEach( f -> FileUtils.append(f, String.format("add:[ma-3-big,%d,4]", now))); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0]", now))); }, false); } + @Test + public void testUnparsableLastRecord() throws IOException + { + testCorruptRecord((t, s) -> t.logFiles().forEach(f -> FileUtils.append(f, "commit:[a,b,c][12345678]")), true); + } + + @Test + public void testUnparsableFirstRecord() throws IOException + { + testCorruptRecord((t, s) -> t.logFiles().forEach(f -> { + List<String> lines = FileUtils.readLines(f); + lines.add(0, "add:[a,b,c][12345678]"); + FileUtils.replace(f, lines.toArray(new String[lines.size()])); + }), false); + } + private static void testCorruptRecord(BiConsumer<LogTransaction, SSTableReader> modifier, boolean isRecoverable) throws IOException { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); - - File dataFolder = sstableOld.descriptor.directory; + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); // simulate tracking sstables with a committed transaction except the checksum will be wrong - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstableNew); @@ -579,8 +941,6 @@ public class LogTransactionTest extends AbstractTransactionalTest // Modify the transaction log or disk state for sstableOld modifier.accept(log, sstableOld); - String txnFilePath = log.getLogFile().file.getPath(); - assertNull(log.complete(null)); sstableOld.selfRef().release(); @@ -611,7 +971,7 @@ public class LogTransactionTest extends AbstractTransactionalTest assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(newFiles, oldFiles, - Collections.singleton(txnFilePath)))); + log.logFilePaths()))); } } @@ -632,11 +992,12 @@ public class LogTransactionTest extends AbstractTransactionalTest private static void testObsoletedFilesChanged(Consumer<SSTableReader> modifier) throws IOException { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstableOld = sstable(cfs, 0, 128); - SSTableReader sstableNew = sstable(cfs, 1, 128); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); // simulate tracking sstables with a committed transaction except the checksum will be wrong - LogTransaction log = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction log = new LogTransaction(OperationType.COMPACTION); assertNotNull(log); log.trackNew(sstableNew); @@ -646,15 +1007,15 @@ public class LogTransactionTest extends AbstractTransactionalTest modifier.accept(sstableOld); //Fake a commit - log.getLogFile().commit(); + log.txnFile().commit(); //This should not remove the old files LogTransaction.removeUnfinishedLeftovers(cfs.metadata); - assertFiles(log.getDataFolder(), Sets.newHashSet(Iterables.concat( - sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(log.getLogFile().file.getPath())))); + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat( + sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + log.logFilePaths()))); sstableOld.selfRef().release(); sstableNew.selfRef().release(); @@ -662,20 +1023,19 @@ public class LogTransactionTest extends AbstractTransactionalTest // complete the transaction to avoid LEAK errors assertNull(log.complete(null)); - assertFiles(log.getDataFolder(), Sets.newHashSet(Iterables.concat( - sstableNew.getAllFilePaths(), - sstableOld.getAllFilePaths(), - Collections.singleton(log.getLogFile().file.getPath())))); + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + log.logFilePaths()))); } @Test public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - File dataFolder = sstable.descriptor.directory; + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); - LogTransaction logs = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction logs = new LogTransaction(OperationType.COMPACTION); assertNotNull(logs); LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable); @@ -696,10 +1056,10 @@ public class LogTransactionTest extends AbstractTransactionalTest public void testGetTemporaryFilesThrowsIfCompletingAfterObsoletion() throws Throwable { ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); - SSTableReader sstable = sstable(cfs, 0, 128); - File dataFolder = sstable.descriptor.directory; + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); - LogTransaction logs = new LogTransaction(OperationType.COMPACTION, cfs.metadata); + LogTransaction logs = new LogTransaction(OperationType.COMPACTION); assertNotNull(logs); LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable); @@ -725,10 +1085,9 @@ public class LogTransactionTest extends AbstractTransactionalTest logs.finish(); } - private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException + private static SSTableReader sstable(File dataFolder, ColumnFamilyStore cfs, int generation, int size) throws IOException { - Directories dir = new Directories(cfs.metadata); - Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getTableName(), generation); + Descriptor descriptor = new Descriptor(dataFolder, cfs.keyspace.getName(), cfs.getTableName(), generation); Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); for (Component component : components) { @@ -782,7 +1141,7 @@ public class LogTransactionTest extends AbstractTransactionalTest continue; String filePath = file.getPath(); - assertTrue(filePath, expectedFiles.contains(filePath)); + assertTrue(String.format("%s not in [%s]", filePath, expectedFiles), expectedFiles.contains(filePath)); expectedFiles.remove(filePath); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index b6cd9a4..4fbbb36 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -86,7 +86,9 @@ public class RealTransactionsTest extends SchemaLoader SSTableReader newSSTable = replaceSSTable(cfs, txn, false); LogTransaction.waitForDeletions(); - assertFiles(txn.log().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths())); + // both sstables are in the same folder + assertFiles(oldSSTable.descriptor.directory.getPath(), new HashSet<>(newSSTable.getAllFilePaths())); + assertFiles(newSSTable.descriptor.directory.getPath(), new HashSet<>(newSSTable.getAllFilePaths())); } @Test @@ -101,7 +103,7 @@ public class RealTransactionsTest extends SchemaLoader replaceSSTable(cfs, txn, true); LogTransaction.waitForDeletions(); - assertFiles(txn.log().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths())); + assertFiles(oldSSTable.descriptor.directory.getPath(), new HashSet<>(oldSSTable.getAllFilePaths())); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/73781a9a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index bd286e4..bfe7b08 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -246,7 +246,7 @@ public class SSTableRewriterTest extends SchemaLoader truncate(cfs); File dir = cfs.getDirectories().getDirectoryForNewSSTables(); - LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); try (SSTableWriter writer = getWriter(cfs, dir, txn)) { for (int i = 0; i < 10000; i++)