Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/15131391 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/15131391 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/15131391 Branch: refs/heads/trunk Commit: 15131391d833c1a0ba0e26903e27867fd101fe72 Parents: 98086b6 c662259 Author: Yuki Morishita <yu...@apache.org> Authored: Fri Apr 1 12:44:52 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Fri Apr 1 12:44:52 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/lifecycle/LogAwareFileLister.java | 23 ------- .../apache/cassandra/io/sstable/Descriptor.java | 20 +++++-- .../db/lifecycle/LogTransactionTest.java | 63 +++++++++++++++----- .../cassandra/io/sstable/DescriptorTest.java | 40 +++++++++---- 5 files changed, 90 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 482c41a,c8a4f21..d8d4834 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,21 -1,9 +1,22 @@@ -2.2.6 +3.0.5 + * Improve backoff policy for cqlsh COPY FROM (CASSANDRA-11320) + * Improve IF NOT EXISTS check in CREATE INDEX (CASSANDRA-11131) + * Upgrade ohc to 0.4.3 + * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093) + * Allocate merkletrees with the correct size (CASSANDRA-11390) + * Support streaming pre-3.0 sstables (CASSANDRA-10990) + * Add backpressure to compressed commit log (CASSANDRA-10971) + * SSTableExport supports secondary index tables (CASSANDRA-11330) + * Fix sstabledump to include missing info in debug output (CASSANDRA-11321) + * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331) + * Fix paging for IN queries on tables without clustering columns (CASSANDRA-11208) + * Remove recursive call from CompositesSearcher (CASSANDRA-11304) + * Fix filtering on non-primary key columns for queries without index (CASSANDRA-6377) + * Fix sstableloader fail when using materialized view (CASSANDRA-11275) +Merged from 2.2: + * Use canonical path for directory in SSTable descriptor (CASSANDRA-10587) * Add cassandra-stress keystore option (CASSANDRA-9325) - * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448). * Dont mark sstables as repairing with sub range repairs (CASSANDRA-11451) - * Fix use of NullUpdater for 2i during compaction (CASSANDRA-11450) * Notify when sstables change after cancelling compaction (CASSANDRA-11373) * cqlsh: COPY FROM should check that explicit column names are valid (CASSANDRA-11333) * Add -Dcassandra.start_gossip startup option (CASSANDRA-10809) http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java index 01bcb8a,0000000..3393b5c mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java @@@ -1,195 -1,0 +1,172 @@@ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.db.Directories; + +import static org.apache.cassandra.db.Directories.*; + +/** + * A class for listing files in a folder. + */ +final class LogAwareFileLister +{ + // The folder to scan + private final Path folder; + + // The filter determines which files the client wants returned + private final BiFunction<File, FileType, Boolean> filter; //file, file type + + // The behavior when we fail to list files + private final OnTxnErr onTxnErr; + + // The unfiltered result + NavigableMap<File, Directories.FileType> files = new TreeMap<>(); + + @VisibleForTesting + LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr) + { + this.folder = folder; + this.filter = filter; + this.onTxnErr = onTxnErr; + } + + public List<File> list() + { + try + { + return innerList(); + } + catch (Throwable t) + { + throw new RuntimeException(String.format("Failed to list files in %s", folder), t); + } + } + + List<File> innerList() throws Throwable + { + list(Files.newDirectoryStream(folder)) + .stream() + .filter((f) -> !LogFile.isLogFile(f)) + .forEach((f) -> files.put(f, FileType.FINAL)); + + // Since many file systems are not atomic, we cannot be sure we have listed a consistent disk state + // (Linux would permit this, but for simplicity we keep our behaviour the same across platforms) + // so we must be careful to list txn log files AFTER every other file since these files are deleted last, + // after all other files are removed + list(Files.newDirectoryStream(folder, '*' + LogFile.EXT)) + .stream() + .filter(LogFile::isLogFile) + .forEach(this::classifyFiles); + + // Finally we apply the user filter before returning our result + return files.entrySet().stream() + .filter((e) -> filter.apply(e.getKey(), e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + static List<File> list(DirectoryStream<Path> stream) throws IOException + { + try + { + return StreamSupport.stream(stream.spliterator(), false) + .map(Path::toFile) + .filter((f) -> !f.isDirectory()) + .collect(Collectors.toList()); + } + finally + { + stream.close(); + } + } + + /** + * We read txn log files, if we fail we throw only if the user has specified + * OnTxnErr.THROW, else we log an error and apply the txn log anyway + */ + void classifyFiles(File txnFile) + { + LogFile txn = LogFile.make(txnFile); + readTxnLog(txn); + classifyFiles(txn); + files.put(txnFile, FileType.TXN_LOG); + } + + void readTxnLog(LogFile txn) + { + if (!txn.verify() && onTxnErr == OnTxnErr.THROW) + throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn); + } + + void classifyFiles(LogFile txnFile) + { + Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.REMOVE); + Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.ADD); + + if (txnFile.completed()) + { // last record present, filter regardless of disk status + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + if (allFilesPresent(txnFile, oldFiles, newFiles)) + { // all files present, transaction is in progress, this will filter as aborted + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + // some files are missing, we expect the txn file to either also be missing or completed, so check + // disk state again to resolve any previous races on non-atomic directory listing platforms + + // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any) + if (!txnFile.exists()) + return; + + // otherwise read the file again to see if it is completed now + readTxnLog(txnFile); + + if (txnFile.completed()) + { // if after re-reading the txn is completed then filter accordingly + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + // some files are missing and yet the txn is still there and not completed + // something must be wrong (see comment at the top of this file requiring txn to be + // completed before obsoleting or aborting sstables) + throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s", + folder, + txnFile)); + } + + /** See if all files are present or if only the last record files are missing and it's a NEW record */ + private static boolean allFilesPresent(LogFile txnFile, Map<LogRecord, Set<File>> oldFiles, Map<LogRecord, Set<File>> newFiles) + { + LogRecord lastRecord = txnFile.getLastRecord(); + return !Stream.concat(oldFiles.entrySet().stream(), + newFiles.entrySet().stream() + .filter((e) -> e.getKey() != lastRecord)) + .filter((e) -> e.getKey().numFiles > e.getValue().size()) + .findFirst().isPresent(); + } + + private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles) + { + Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : newFiles; + temporary.stream() + .flatMap(Set::stream) + .forEach((f) -> this.files.put(f, FileType.TEMPORARY)); + } - - @VisibleForTesting - static Set<File> getTemporaryFiles(File folder) - { - return listFiles(folder, FileType.TEMPORARY); - } - - @VisibleForTesting - static Set<File> getFinalFiles(File folder) - { - return listFiles(folder, FileType.FINAL); - } - - @VisibleForTesting - static Set<File> listFiles(File folder, FileType ... types) - { - Collection<FileType> match = Arrays.asList(types); - return new LogAwareFileLister(folder.toPath(), - (file, type) -> match.contains(type), - OnTxnErr.IGNORE).list() - .stream() - .collect(Collectors.toSet()); - } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/Descriptor.java index dbce56c,ed81616..ff4abfc --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@@ -18,10 -18,12 +18,12 @@@ package org.apache.cassandra.io.sstable; import java.io.File; + import java.io.IOError; + import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.StringTokenizer; +import java.util.*; +import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; import com.google.common.base.Objects; @@@ -46,7 -47,20 +48,9 @@@ import static org.apache.cassandra.io.s */ public class Descriptor { - - public static enum Type - { - TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false); - public final boolean isTemporary; - public final String marker; - Type(String marker, boolean isTemporary) - { - this.isTemporary = isTemporary; - this.marker = marker; - } - } + public static String TMP_EXT = ".tmp"; + + /** canonicalized path to the directory where SSTable resides */ public final File directory; /** version has the following format: <code>[a-z]+</code> */ public final Version version; @@@ -89,10 -104,10 +100,10 @@@ this.ksname = ksname; this.cfname = cfname; this.generation = generation; - this.type = temp; this.formatType = formatType; + this.digestComponent = digestComponent; - hashCode = Objects.hashCode(version, directory, generation, ksname, cfname, formatType); - hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, temp, formatType); ++ hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, formatType); } public Descriptor withGeneration(int newGeneration) http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index 4f2fc73,0000000..45b5844 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@@ -1,1183 -1,0 +1,1214 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; ++import java.io.IOError; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.util.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.BufferedSegmentedFile; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; +import org.apache.cassandra.utils.concurrent.Transactional; + +public class LogTransactionTest extends AbstractTransactionalTest +{ + private static final String KEYSPACE = "TransactionLogsTest"; + + @BeforeClass + public static void setUp() + { + MockSchema.cleanup(); + } + + protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception + { + LogTransaction.waitForDeletions(); + SSTableReader.resetTidying(); + return new TxnTest(); + } + + private static final class TxnTest extends TestableTransaction + { + private final static class Transaction extends Transactional.AbstractTransactional implements Transactional + { + final ColumnFamilyStore cfs; + final LogTransaction txnLogs; + final File dataFolder; + final SSTableReader sstableOld; + final SSTableReader sstableNew; + final LogTransaction.SSTableTidier tidier; + + Transaction(ColumnFamilyStore cfs, LogTransaction txnLogs) throws IOException + { + this.cfs = cfs; + this.txnLogs = txnLogs; + this.dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + this.sstableOld = sstable(dataFolder, cfs, 0, 128); + this.sstableNew = sstable(dataFolder, cfs, 1, 128); + + assertNotNull(txnLogs); + assertNotNull(txnLogs.id()); + Assert.assertEquals(OperationType.COMPACTION, txnLogs.type()); + + txnLogs.trackNew(sstableNew); + tidier = txnLogs.obsoleted(sstableOld); + assertNotNull(tidier); + } + + protected Throwable doCommit(Throwable accumulate) + { + sstableOld.markObsolete(tidier); + sstableOld.selfRef().release(); + LogTransaction.waitForDeletions(); + + Throwable ret = txnLogs.commit(accumulate); + + sstableNew.selfRef().release(); + return ret; + } + + protected Throwable doAbort(Throwable accumulate) + { + tidier.abort(); + LogTransaction.waitForDeletions(); + + Throwable ret = txnLogs.abort(accumulate); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + return ret; + } + + protected void doPrepare() + { + txnLogs.prepareToCommit(); + } + + void assertInProgress() throws Exception + { + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + txnLogs.logFilePaths()))); + } + + void assertPrepared() throws Exception + { + } + + void assertAborted() throws Exception + { + assertFiles(dataFolder.getPath(), new HashSet<>(sstableOld.getAllFilePaths())); + } + + void assertCommitted() throws Exception + { + assertFiles(dataFolder.getPath(), new HashSet<>(sstableNew.getAllFilePaths())); + } + } + + final Transaction txn; + + private TxnTest() throws IOException + { + this(MockSchema.newCFS(KEYSPACE)); + } + + private TxnTest(ColumnFamilyStore cfs) throws IOException + { + this(cfs, new LogTransaction(OperationType.COMPACTION)); + } + + private TxnTest(ColumnFamilyStore cfs, LogTransaction txnLogs) throws IOException + { + this(new Transaction(cfs, txnLogs)); + } + + private TxnTest(Transaction txn) + { + super(txn); + this.txn = txn; + } + + protected void assertInProgress() throws Exception + { + txn.assertInProgress(); + } + + protected void assertPrepared() throws Exception + { + txn.assertPrepared(); + } + + protected void assertAborted() throws Exception + { + txn.assertAborted(); + } + + protected void assertCommitted() throws Exception + { + txn.assertCommitted(); + } + } + + @Test + public void testUntrack() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); + + // complete a transaction without keep the new files since they were untracked + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstableNew); + log.untrackNew(sstableNew); + + log.finish(); + + sstableNew.selfRef().release(); + Thread.sleep(1); + LogTransaction.waitForDeletions(); + + assertFiles(dataFolder.getPath(), Collections.<String>emptySet()); + } + + @Test + public void testCommitSameDesc() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld1 = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableOld2 = sstable(dataFolder, cfs, 0, 256); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstableNew); + + sstableOld1.setReplaced(); + + LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld2); + assertNotNull(tidier); + + log.finish(); + + sstableOld2.markObsolete(tidier); + + sstableOld1.selfRef().release(); + sstableOld2.selfRef().release(); + + assertFiles(dataFolder.getPath(), new HashSet<>(sstableNew.getAllFilePaths())); + + sstableNew.selfRef().release(); + } + + @Test + public void testCommitOnlyNew() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstable); + log.finish(); + + assertFiles(dataFolder.getPath(), new HashSet<>(sstable.getAllFilePaths())); + + sstable.selfRef().release(); + } + + @Test + public void testCommitOnlyOld() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier tidier = log.obsoleted(sstable); + assertNotNull(tidier); + + log.finish(); + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + assertFiles(dataFolder.getPath(), new HashSet<>()); + } + + @Test + public void testCommitMultipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + log.finish(); + + sstables[0].markObsolete(tidiers[0]); + sstables[2].markObsolete(tidiers[1]); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + LogTransaction.waitForDeletions(); + + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[3].getAllFilePaths())); + } + + @Test + public void testAbortOnlyNew() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstable); + log.abort(); + + sstable.selfRef().release(); + + assertFiles(dataFolder.getPath(), new HashSet<>()); + } + + @Test + public void testAbortOnlyOld() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier tidier = log.obsoleted(sstable); + assertNotNull(tidier); + + tidier.abort(); + log.abort(); + + sstable.selfRef().release(); + + assertFiles(dataFolder.getPath(), new HashSet<>(sstable.getAllFilePaths())); + } + + @Test + public void testAbortMultipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::abort); + log.abort(); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + LogTransaction.waitForDeletions(); + + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[0].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[2].getAllFilePaths())); + } + + + @Test + public void testRemoveUnfinishedLeftovers_abort() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); + + // simulate tracking sstables with a failed transaction (new log file NOT deleted) + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstableNew); + LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld); + + Set<File> tmpFiles = sstableNew.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + - Assert.assertEquals(tmpFiles, LogAwareFileLister.getTemporaryFiles(sstableNew.descriptor.directory)); ++ Assert.assertEquals(tmpFiles, getTemporaryFiles(sstableNew.descriptor.directory)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + // sstableOld should be only table left + Directories directories = new Directories(cfs.metadata); + Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); + assertEquals(1, sstables.size()); + + assertFiles(dataFolder.getPath(), new HashSet<>(sstableOld.getAllFilePaths())); + + // complete the transaction before releasing files + tidier.run(); + log.close(); + } + + @Test + public void testRemoveUnfinishedLeftovers_commit() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); + + // simulate tracking sstables with a committed transaction (new log file deleted) + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstableNew); + LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld); + + //Fake a commit + log.txnFile().commit(); + + Set<File> tmpFiles = sstableOld.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + - Assert.assertEquals(tmpFiles, LogAwareFileLister.getTemporaryFiles(sstableOld.descriptor.directory)); ++ Assert.assertEquals(tmpFiles, getTemporaryFiles(sstableOld.descriptor.directory)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + // sstableNew should be only table left + Directories directories = new Directories(cfs.metadata); + Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); + assertEquals(1, sstables.size()); + + assertFiles(dataFolder.getPath(), new HashSet<>(sstableNew.getAllFilePaths())); + + // complete the transaction to avoid LEAK errors + tidier.run(); + assertNull(log.complete(null)); + } + + @Test + public void testRemoveUnfinishedLeftovers_commit_multipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + Collection<File> logFiles = log.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // fake a commit + log.txnFile().commit(); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + + // test listing + Assert.assertEquals(sstables[0].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), - LogAwareFileLister.getTemporaryFiles(dataFolder1)); ++ getTemporaryFiles(dataFolder1)); + Assert.assertEquals(sstables[2].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), - LogAwareFileLister.getTemporaryFiles(dataFolder2)); ++ getTemporaryFiles(dataFolder2)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)); + + // new tables should be only table left + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[3].getAllFilePaths())); + + // complete the transaction to avoid LEAK errors + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run); + assertNull(log.complete(null)); + } + + @Test + public void testRemoveUnfinishedLeftovers_abort_multipleFolders() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + Collection<File> logFiles = log.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // fake an abort + log.txnFile().abort(); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + + // test listing + Assert.assertEquals(sstables[1].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), - LogAwareFileLister.getTemporaryFiles(dataFolder1)); ++ getTemporaryFiles(dataFolder1)); + Assert.assertEquals(sstables[3].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()), - LogAwareFileLister.getTemporaryFiles(dataFolder2)); ++ getTemporaryFiles(dataFolder2)); + + // normally called at startup + LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)); + + // old tables should be only table left + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[0].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[2].getAllFilePaths())); + + // complete the transaction to avoid LEAK errors + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run); + assertNull(log.complete(null)); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_mismatchedFinalRecords() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert mismatched records + FileUtils.append(logFiles.get(0), LogRecord.makeCommit(System.currentTimeMillis()).raw); + FileUtils.append(logFiles.get(1), LogRecord.makeAbort(System.currentTimeMillis()).raw); + + }, false); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_first() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a full record and a partial one + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + int toChop = finalRecord.length() / 2; + FileUtils.append(logFiles.get(0), finalRecord.substring(0, finalRecord.length() - toChop)); + FileUtils.append(logFiles.get(1), finalRecord); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_second() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a full record and a partial one + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + int toChop = finalRecord.length() / 2; + FileUtils.append(logFiles.get(0), finalRecord); + FileUtils.append(logFiles.get(1), finalRecord.substring(0, finalRecord.length() - toChop)); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_first() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a partial sstable record and a full commit record + String sstableRecord = LogRecord.make(LogRecord.Type.ADD, Collections.emptyList(), 0, "abc").raw; + int toChop = sstableRecord.length() / 2; + FileUtils.append(logFiles.get(0), sstableRecord.substring(0, sstableRecord.length() - toChop)); + FileUtils.append(logFiles.get(1), sstableRecord); + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + FileUtils.append(logFiles.get(0), finalRecord); + FileUtils.append(logFiles.get(1), finalRecord); + + }, false); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_second() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert a partial sstable record and a full commit record + String sstableRecord = LogRecord.make(LogRecord.Type.ADD, Collections.emptyList(), 0, "abc").raw; + int toChop = sstableRecord.length() / 2; + FileUtils.append(logFiles.get(0), sstableRecord); + FileUtils.append(logFiles.get(1), sstableRecord.substring(0, sstableRecord.length() - toChop)); + String finalRecord = LogRecord.makeCommit(System.currentTimeMillis()).raw; + FileUtils.append(logFiles.get(0), finalRecord); + FileUtils.append(logFiles.get(1), finalRecord); + + }, false); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_first() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert only one commit record + FileUtils.append(logFiles.get(0), LogRecord.makeCommit(System.currentTimeMillis()).raw); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_second() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert only one commit record + FileUtils.append(logFiles.get(1), LogRecord.makeCommit(System.currentTimeMillis()).raw); + + }, true); + } + + @Test + public void testRemoveUnfinishedLeftovers_multipleFolders_tooManyFinalRecords() throws Throwable + { + testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> { + List<File> logFiles = txn.logFiles(); + Assert.assertEquals(2, logFiles.size()); + + // insert mismatched records + FileUtils.append(logFiles.get(0), LogRecord.makeCommit(System.currentTimeMillis()).raw); + FileUtils.append(logFiles.get(1), LogRecord.makeCommit(System.currentTimeMillis()).raw); + FileUtils.append(logFiles.get(1), LogRecord.makeCommit(System.currentTimeMillis()).raw); + + }, false); + } + + private static void testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(Consumer<LogTransaction> modifier, boolean shouldCommit) throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + + File origiFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + File dataFolder1 = new File(origiFolder, "1"); + File dataFolder2 = new File(origiFolder, "2"); + Files.createDirectories(dataFolder1.toPath()); + Files.createDirectories(dataFolder2.toPath()); + + SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128), + sstable(dataFolder1, cfs, 1, 128), + sstable(dataFolder2, cfs, 2, 128), + sstable(dataFolder2, cfs, 3, 128) + }; + + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + LogTransaction.SSTableTidier[] tidiers = { log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) }; + + log.trackNew(sstables[1]); + log.trackNew(sstables[3]); + + // fake some error condition on the txn logs + modifier.accept(log); + + Arrays.stream(sstables).forEach(s -> s.selfRef().release()); + + LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)); + LogTransaction.waitForDeletions(); + + if (shouldCommit) + { + // only new sstables should still be there + assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths())); + assertFiles(dataFolder2.getPath(), new HashSet<>(sstables[3].getAllFilePaths())); + } + else + { + // all files should still be there + assertFiles(dataFolder1.getPath(), Sets.newHashSet(Iterables.concat(sstables[0].getAllFilePaths(), + sstables[1].getAllFilePaths(), + Collections.singleton(log.logFilePaths().get(0))))); + assertFiles(dataFolder2.getPath(), Sets.newHashSet(Iterables.concat(sstables[2].getAllFilePaths(), + sstables[3].getAllFilePaths(), + Collections.singleton(log.logFilePaths().get(1))))); + } + + + // complete the transaction to avoid LEAK errors + Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run); + log.txnFile().commit(); // just anything to make sure transaction tidier will finish + assertNull(log.complete(null)); + } + + @Test + public void testGetTemporaryFiles() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable1 = sstable(dataFolder, cfs, 0, 128); + - Set<File> tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); ++ Set<File> tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + + try(LogTransaction log = new LogTransaction(OperationType.WRITE)) + { + Directories directories = new Directories(cfs.metadata); + + File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); + + SSTableReader sstable2 = sstable(dataFolder, cfs, 1, 128); + log.trackNew(sstable2); + + Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list(); + assertEquals(2, sstables.size()); + + // this should contain sstable1, sstable2 and the transaction log file + File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); + + int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length; + assertEquals(numNewFiles - 1, sstable2.getAllFilePaths().size()); // new files except for transaction log file + - tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); ++ tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(numNewFiles - 1, tmpFiles.size()); + + File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA)); + File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX)); + + assertTrue(tmpFiles.contains(ssTable2DataFile)); + assertTrue(tmpFiles.contains(ssTable2IndexFile)); + + List<File> files = directories.sstableLister(Directories.OnTxnErr.THROW).listFiles(); + List<File> filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles(); + assertNotNull(files); + assertNotNull(filesNoTmp); + + assertTrue(files.contains(ssTable2DataFile)); + assertTrue(files.contains(ssTable2IndexFile)); + + assertFalse(filesNoTmp.contains(ssTable2DataFile)); + assertFalse(filesNoTmp.contains(ssTable2IndexFile)); + + log.finish(); + + //Now it should be empty since the transaction has finished - tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder); ++ tmpFiles = getTemporaryFiles(dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + + filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles(); + assertNotNull(filesNoTmp); + assertTrue(filesNoTmp.contains(ssTable2DataFile)); + assertTrue(filesNoTmp.contains(ssTable2IndexFile)); + + sstable1.selfRef().release(); + sstable2.selfRef().release(); + } + } + + @Test + public void testWrongChecksumLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake a commit with invalid checksum + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0][%d]", now, 12345678L))); + }, + true); + } + + @Test + public void testWrongChecksumSecondFromLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake two lines with invalid checksum + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("add:[ma-3-big,%d,4][%d]", now, 12345678L))); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0][%d]", now, 12345678L))); + }, + false); + } + + @Test + public void testWrongChecksumLastLineMissingFile() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake a commit with invalid checksum and also delete one of the old files + for (String filePath : s.getAllFilePaths()) + { + if (filePath.endsWith("Data.db")) + { + assertTrue(FileUtils.delete(filePath)); + assertNull(t.txnFile().syncFolder(null)); + break; + } + } + + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0][%d]", now, 12345678L))); + }, + false); + } + + @Test + public void testWrongChecksumLastLineWrongRecordFormat() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake a commit with invalid checksum and a wrong record format (extra spaces) + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d ,0 ,0 ][%d]", now, 12345678L))); + }, + true); + } + + @Test + public void testMissingChecksumLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { + // Fake a commit without a checksum + long now = System.currentTimeMillis(); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0]", now))); + }, + true); + } + + @Test + public void testMissingChecksumSecondFromLastLine() throws IOException + { + testCorruptRecord((t, s) -> + { // Fake two lines without a checksum + long now = System.currentTimeMillis(); + t.logFiles().forEach( f -> FileUtils.append(f, String.format("add:[ma-3-big,%d,4]", now))); + t.logFiles().forEach(f -> FileUtils.append(f, String.format("commit:[%d,0,0]", now))); + }, + false); + } + + @Test + public void testUnparsableLastRecord() throws IOException + { + testCorruptRecord((t, s) -> t.logFiles().forEach(f -> FileUtils.append(f, "commit:[a,b,c][12345678]")), true); + } + + @Test + public void testUnparsableFirstRecord() throws IOException + { + testCorruptRecord((t, s) -> t.logFiles().forEach(f -> { + List<String> lines = FileUtils.readLines(f); + lines.add(0, "add:[a,b,c][12345678]"); + FileUtils.replace(f, lines.toArray(new String[lines.size()])); + }), false); + } + + private static void testCorruptRecord(BiConsumer<LogTransaction, SSTableReader> modifier, boolean isRecoverable) throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); + + // simulate tracking sstables with a committed transaction except the checksum will be wrong + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstableNew); + log.obsoleted(sstableOld); + + // Modify the transaction log or disk state for sstableOld + modifier.accept(log, sstableOld); + + assertNull(log.complete(null)); + + sstableOld.selfRef().release(); + sstableNew.selfRef().release(); + + // The files on disk, for old files make sure to exclude the files that were deleted by the modifier + Set<String> newFiles = sstableNew.getAllFilePaths().stream().collect(Collectors.toSet()); + Set<String> oldFiles = sstableOld.getAllFilePaths().stream().filter(p -> new File(p).exists()).collect(Collectors.toSet()); + + //This should filter as in progress since the last record is corrupt - assertFiles(newFiles, LogAwareFileLister.getTemporaryFiles(dataFolder)); - assertFiles(oldFiles, LogAwareFileLister.getFinalFiles(dataFolder)); ++ assertFiles(newFiles, getTemporaryFiles(dataFolder)); ++ assertFiles(oldFiles, getFinalFiles(dataFolder)); + + if (isRecoverable) + { // the corruption is recoverable but the commit record is unreadable so the transaction is still in progress + + //This should remove new files + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + // make sure to exclude the old files that were deleted by the modifier + assertFiles(dataFolder.getPath(), oldFiles); + } + else + { // if an intermediate line was also modified, it should ignore the tx log file + + //This should not remove any files + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(newFiles, + oldFiles, + log.logFilePaths()))); + } + } + + @Test + public void testObsoletedDataFileUpdateTimeChanged() throws IOException + { + testObsoletedFilesChanged(sstable -> + { + // increase the modification time of the Data file + for (String filePath : sstable.getAllFilePaths()) + { + if (filePath.endsWith("Data.db")) + assertTrue(new File(filePath).setLastModified(System.currentTimeMillis() + 60000)); //one minute later + } + }); + } + + private static void testObsoletedFilesChanged(Consumer<SSTableReader> modifier) throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128); + SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128); + + // simulate tracking sstables with a committed transaction except the checksum will be wrong + LogTransaction log = new LogTransaction(OperationType.COMPACTION); + assertNotNull(log); + + log.trackNew(sstableNew); + /*TransactionLog.SSTableTidier tidier =*/ log.obsoleted(sstableOld); + + //modify the old sstable files + modifier.accept(sstableOld); + + //Fake a commit + log.txnFile().commit(); + + //This should not remove the old files + LogTransaction.removeUnfinishedLeftovers(cfs.metadata); + + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat( + sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + log.logFilePaths()))); + + sstableOld.selfRef().release(); + sstableNew.selfRef().release(); + + // complete the transaction to avoid LEAK errors + assertNull(log.complete(null)); + + assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths(), + log.logFilePaths()))); + } + + @Test + public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); + + LogTransaction logs = new LogTransaction(OperationType.COMPACTION); + assertNotNull(logs); + + LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable); + + logs.finish(); + + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + // This should race with the asynchronous deletion of txn log files + // It doesn't matter what it returns but it should not throw because the txn + // was completed before deleting files (i.e. releasing sstables) + for (int i = 0; i < 200; i++) - LogAwareFileLister.getTemporaryFiles(dataFolder); ++ getTemporaryFiles(dataFolder); + } + + @Test + public void testGetTemporaryFilesThrowsIfCompletingAfterObsoletion() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + File dataFolder = new Directories(cfs.metadata).getDirectoryForNewSSTables(); + SSTableReader sstable = sstable(dataFolder, cfs, 0, 128); + + LogTransaction logs = new LogTransaction(OperationType.COMPACTION); + assertNotNull(logs); + + LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable); + + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + LogTransaction.waitForDeletions(); + + try + { + // This should race with the asynchronous deletion of txn log files + // it should throw because we are violating the requirement that a transaction must + // finish before deleting files (i.e. releasing sstables) - LogAwareFileLister.getTemporaryFiles(dataFolder); ++ getTemporaryFiles(dataFolder); + fail("Expected runtime exception"); + } + catch(RuntimeException e) + { + //pass as long as the cause is not an assertion + assertFalse(e.getCause() instanceof AssertionError); + } + + logs.finish(); + } + + private static SSTableReader sstable(File dataFolder, ColumnFamilyStore cfs, int generation, int size) throws IOException + { + Descriptor descriptor = new Descriptor(dataFolder, cfs.keyspace.getName(), cfs.getTableName(), generation); + Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); + for (Component component : components) + { + File file = new File(descriptor.filenameFor(component)); + if (!file.exists()) + assertTrue(file.createNewFile()); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + + SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); + SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); + + SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); + StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) + .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header) + .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, + components, + cfs.metadata, + dFile, + iFile, + MockSchema.indexSummary.sharedCopy(), + new AlwaysPresentFilter(), + 1L, + metadata, + SSTableReader.OpenReason.NORMAL, + header); + reader.first = reader.last = MockSchema.readerBounds(generation); + return reader; + } + - private static void assertFiles(String dirPath, Set<String> expectedFiles) ++ private static void assertFiles(String dirPath, Set<String> expectedFiles) throws IOException + { + assertFiles(dirPath, expectedFiles, false); + } + - private static void assertFiles(String dirPath, Set<String> expectedFiles, boolean excludeNonExistingFiles) ++ private static void assertFiles(String dirPath, Set<String> expectedFiles, boolean excludeNonExistingFiles) throws IOException + { + LogTransaction.waitForDeletions(); + - File dir = new File(dirPath); ++ File dir = new File(dirPath).getCanonicalFile(); + File[] files = dir.listFiles(); + if (files != null) + { + for (File file : files) + { + if (file.isDirectory()) + continue; + + String filePath = file.getPath(); + assertTrue(String.format("%s not in [%s]", filePath, expectedFiles), expectedFiles.contains(filePath)); + expectedFiles.remove(filePath); + } + } + + if (excludeNonExistingFiles) + { + for (String filePath : expectedFiles) + { + File file = new File(filePath); + if (!file.exists()) + expectedFiles.remove(filePath); + } + } + + assertTrue(expectedFiles.toString(), expectedFiles.isEmpty()); + } + + // Check either that a temporary file is expected to exist (in the existingFiles) or that + // it does not exist any longer (on Windows we need to check File.exists() because a list + // might return a file as existing even if it does not) + private static void assertFiles(Iterable<String> existingFiles, Set<File> temporaryFiles) + { + for (String filePath : existingFiles) + { + File file = new File(filePath); + assertTrue(filePath, temporaryFiles.contains(file)); + temporaryFiles.remove(file); + } + + for (File file : temporaryFiles) + { + if (!file.exists()) + temporaryFiles.remove(file); + } + + assertTrue(temporaryFiles.toString(), temporaryFiles.isEmpty()); + } ++ ++ static Set<File> getTemporaryFiles(File folder) ++ { ++ return listFiles(folder, Directories.FileType.TEMPORARY); ++ } ++ ++ static Set<File> getFinalFiles(File folder) ++ { ++ return listFiles(folder, Directories.FileType.FINAL); ++ } ++ ++ static Set<File> listFiles(File folder, Directories.FileType... types) ++ { ++ Collection<Directories.FileType> match = Arrays.asList(types); ++ return new LogAwareFileLister(folder.toPath(), ++ (file, type) -> match.contains(type), ++ Directories.OnTxnErr.IGNORE).list() ++ .stream() ++ .map(f -> { ++ try ++ { ++ return f.getCanonicalFile(); ++ } ++ catch (IOException e) ++ { ++ throw new IOError(e); ++ } ++ }) ++ .collect(Collectors.toSet()); ++ } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java index 19eca40,6354fc2..184d637 --- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java @@@ -116,18 -127,33 +115,33 @@@ public class DescriptorTes } @Test - public void validateNames() + public void testEquality() { + // Descriptor should be equal when parent directory points to the same directory + File dir = new File("."); - Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1, Descriptor.Type.FINAL); - Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 1, Descriptor.Type.FINAL); ++ Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1); ++ Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 1); + assertEquals(desc1, desc2); + assertEquals(desc1.hashCode(), desc2.hashCode()); + } - String names[] = { - /*"system-schema_keyspaces-ka-1-CompressionInfo.db", "system-schema_keyspaces-ka-1-Summary.db", - "system-schema_keyspaces-ka-1-Data.db", "system-schema_keyspaces-ka-1-TOC.txt", - "system-schema_keyspaces-ka-1-Digest.sha1", "system-schema_keyspaces-ka-2-CompressionInfo.db", - "system-schema_keyspaces-ka-1-Filter.db", "system-schema_keyspaces-ka-2-Data.db", - "system-schema_keyspaces-ka-1-Index.db", "system-schema_keyspaces-ka-2-Digest.sha1", - "system-schema_keyspaces-ka-1-Statistics.db", - "system-schema_keyspacest-tmp-ka-1-Data.db",*/ - "system-schema_keyspace-ka-1-"+ SSTableFormat.Type.BIG.name+"-Data.db" + @Test + public void validateNames() + { - ++ // TODO tmp file name probably is not handled correctly after CASSANDRA-7066 + String[] names = { + // old formats + "system-schema_keyspaces-jb-1-Data.db", - "system-schema_keyspaces-tmp-jb-1-Data.db", ++ //"system-schema_keyspaces-tmp-jb-1-Data.db", + "system-schema_keyspaces-ka-1-big-Data.db", - "system-schema_keyspaces-tmp-ka-1-big-Data.db", ++ //"system-schema_keyspaces-tmp-ka-1-big-Data.db", + // 2ndary index + "keyspace1-standard1.idx1-ka-1-big-Data.db", + // new formats + "la-1-big-Data.db", - "tmp-la-1-big-Data.db", ++ //"tmp-la-1-big-Data.db", + // 2ndary index + ".idx1" + File.separator + "la-1-big-Data.db", }; for (String name : names)