Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/15131391
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/15131391
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/15131391

Branch: refs/heads/trunk
Commit: 15131391d833c1a0ba0e26903e27867fd101fe72
Parents: 98086b6 c662259
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Apr 1 12:44:52 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 1 12:44:52 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/lifecycle/LogAwareFileLister.java        | 23 -------
 .../apache/cassandra/io/sstable/Descriptor.java | 20 +++++--
 .../db/lifecycle/LogTransactionTest.java        | 63 +++++++++++++++-----
 .../cassandra/io/sstable/DescriptorTest.java    | 40 +++++++++----
 5 files changed, 90 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 482c41a,c8a4f21..d8d4834
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,9 +1,22 @@@
 -2.2.6
 +3.0.5
 + * Improve backoff policy for cqlsh COPY FROM (CASSANDRA-11320)
 + * Improve IF NOT EXISTS check in CREATE INDEX (CASSANDRA-11131)
 + * Upgrade ohc to 0.4.3
 + * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093)
 + * Allocate merkletrees with the correct size (CASSANDRA-11390)
 + * Support streaming pre-3.0 sstables (CASSANDRA-10990)
 + * Add backpressure to compressed commit log (CASSANDRA-10971)
 + * SSTableExport supports secondary index tables (CASSANDRA-11330)
 + * Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
 + * Establish and implement canonical bulk reading workload(s) 
(CASSANDRA-10331)
 + * Fix paging for IN queries on tables without clustering columns 
(CASSANDRA-11208)
 + * Remove recursive call from CompositesSearcher (CASSANDRA-11304)
 + * Fix filtering on non-primary key columns for queries without index 
(CASSANDRA-6377)
 + * Fix sstableloader fail when using materialized view (CASSANDRA-11275)
 +Merged from 2.2:
+  * Use canonical path for directory in SSTable descriptor (CASSANDRA-10587)
   * Add cassandra-stress keystore option (CASSANDRA-9325)
 - * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448).
   * Dont mark sstables as repairing with sub range repairs (CASSANDRA-11451)
 - * Fix use of NullUpdater for 2i during compaction (CASSANDRA-11450)
   * Notify when sstables change after cancelling compaction (CASSANDRA-11373)
   * cqlsh: COPY FROM should check that explicit column names are valid 
(CASSANDRA-11333)
   * Add -Dcassandra.start_gossip startup option (CASSANDRA-10809)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
index 01bcb8a,0000000..3393b5c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
@@@ -1,195 -1,0 +1,172 @@@
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.DirectoryStream;
 +import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.util.*;
 +import java.util.function.BiFunction;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +import java.util.stream.StreamSupport;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import org.apache.cassandra.db.Directories;
 +
 +import static org.apache.cassandra.db.Directories.*;
 +
 +/**
 + * A class for listing files in a folder.
 + */
 +final class LogAwareFileLister
 +{
 +    // The folder to scan
 +    private final Path folder;
 +
 +    // The filter determines which files the client wants returned
 +    private final BiFunction<File, FileType, Boolean> filter; //file, file 
type
 +
 +    // The behavior when we fail to list files
 +    private final OnTxnErr onTxnErr;
 +
 +    // The unfiltered result
 +    NavigableMap<File, Directories.FileType> files = new TreeMap<>();
 +
 +    @VisibleForTesting
 +    LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> 
filter, OnTxnErr onTxnErr)
 +    {
 +        this.folder = folder;
 +        this.filter = filter;
 +        this.onTxnErr = onTxnErr;
 +    }
 +
 +    public List<File> list()
 +    {
 +        try
 +        {
 +            return innerList();
 +        }
 +        catch (Throwable t)
 +        {
 +            throw new RuntimeException(String.format("Failed to list files in 
%s", folder), t);
 +        }
 +    }
 +
 +    List<File> innerList() throws Throwable
 +    {
 +        list(Files.newDirectoryStream(folder))
 +        .stream()
 +        .filter((f) -> !LogFile.isLogFile(f))
 +        .forEach((f) -> files.put(f, FileType.FINAL));
 +
 +        // Since many file systems are not atomic, we cannot be sure we have 
listed a consistent disk state
 +        // (Linux would permit this, but for simplicity we keep our behaviour 
the same across platforms)
 +        // so we must be careful to list txn log files AFTER every other file 
since these files are deleted last,
 +        // after all other files are removed
 +        list(Files.newDirectoryStream(folder, '*' + LogFile.EXT))
 +        .stream()
 +        .filter(LogFile::isLogFile)
 +        .forEach(this::classifyFiles);
 +
 +        // Finally we apply the user filter before returning our result
 +        return files.entrySet().stream()
 +                    .filter((e) -> filter.apply(e.getKey(), e.getValue()))
 +                    .map(Map.Entry::getKey)
 +                    .collect(Collectors.toList());
 +    }
 +
 +    static List<File> list(DirectoryStream<Path> stream) throws IOException
 +    {
 +        try
 +        {
 +            return StreamSupport.stream(stream.spliterator(), false)
 +                                .map(Path::toFile)
 +                                .filter((f) -> !f.isDirectory())
 +                                .collect(Collectors.toList());
 +        }
 +        finally
 +        {
 +            stream.close();
 +        }
 +    }
 +
 +    /**
 +     * We read txn log files, if we fail we throw only if the user has 
specified
 +     * OnTxnErr.THROW, else we log an error and apply the txn log anyway
 +     */
 +    void classifyFiles(File txnFile)
 +    {
 +        LogFile txn = LogFile.make(txnFile);
 +        readTxnLog(txn);
 +        classifyFiles(txn);
 +        files.put(txnFile, FileType.TXN_LOG);
 +    }
 +
 +    void readTxnLog(LogFile txn)
 +    {
 +        if (!txn.verify() && onTxnErr == OnTxnErr.THROW)
 +            throw new LogTransaction.CorruptTransactionLogException("Some 
records failed verification. See earlier in log for details.", txn);
 +    }
 +
 +    void classifyFiles(LogFile txnFile)
 +    {
 +        Map<LogRecord, Set<File>> oldFiles = 
txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.REMOVE);
 +        Map<LogRecord, Set<File>> newFiles = 
txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.ADD);
 +
 +        if (txnFile.completed())
 +        { // last record present, filter regardless of disk status
 +            setTemporary(txnFile, oldFiles.values(), newFiles.values());
 +            return;
 +        }
 +
 +        if (allFilesPresent(txnFile, oldFiles, newFiles))
 +        {  // all files present, transaction is in progress, this will filter 
as aborted
 +            setTemporary(txnFile, oldFiles.values(), newFiles.values());
 +            return;
 +        }
 +
 +        // some files are missing, we expect the txn file to either also be 
missing or completed, so check
 +        // disk state again to resolve any previous races on non-atomic 
directory listing platforms
 +
 +        // if txn file also gone, then do nothing (all temporary should be 
gone, we could remove them if any)
 +        if (!txnFile.exists())
 +            return;
 +
 +        // otherwise read the file again to see if it is completed now
 +        readTxnLog(txnFile);
 +
 +        if (txnFile.completed())
 +        { // if after re-reading the txn is completed then filter accordingly
 +            setTemporary(txnFile, oldFiles.values(), newFiles.values());
 +            return;
 +        }
 +
 +        // some files are missing and yet the txn is still there and not 
completed
 +        // something must be wrong (see comment at the top of this file 
requiring txn to be
 +        // completed before obsoleting or aborting sstables)
 +        throw new RuntimeException(String.format("Failed to list directory 
files in %s, inconsistent disk state for transaction %s",
 +                                                 folder,
 +                                                 txnFile));
 +    }
 +
 +    /** See if all files are present or if only the last record files are 
missing and it's a NEW record */
 +    private static boolean allFilesPresent(LogFile txnFile, Map<LogRecord, 
Set<File>> oldFiles, Map<LogRecord, Set<File>> newFiles)
 +    {
 +        LogRecord lastRecord = txnFile.getLastRecord();
 +        return !Stream.concat(oldFiles.entrySet().stream(),
 +                              newFiles.entrySet().stream()
 +                                      .filter((e) -> e.getKey() != 
lastRecord))
 +                      .filter((e) -> e.getKey().numFiles > 
e.getValue().size())
 +                      .findFirst().isPresent();
 +    }
 +
 +    private void setTemporary(LogFile txnFile, Collection<Set<File>> 
oldFiles, Collection<Set<File>> newFiles)
 +    {
 +        Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : 
newFiles;
 +        temporary.stream()
 +                 .flatMap(Set::stream)
 +                 .forEach((f) -> this.files.put(f, FileType.TEMPORARY));
 +    }
- 
-     @VisibleForTesting
-     static Set<File> getTemporaryFiles(File folder)
-     {
-         return listFiles(folder, FileType.TEMPORARY);
-     }
- 
-     @VisibleForTesting
-     static Set<File> getFinalFiles(File folder)
-     {
-         return listFiles(folder, FileType.FINAL);
-     }
- 
-     @VisibleForTesting
-     static Set<File> listFiles(File folder, FileType ... types)
-     {
-         Collection<FileType> match = Arrays.asList(types);
-         return new LogAwareFileLister(folder.toPath(),
-                                       (file, type) -> match.contains(type),
-                                       OnTxnErr.IGNORE).list()
-                                                       .stream()
-                                                       
.collect(Collectors.toSet());
-     }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/Descriptor.java
index dbce56c,ed81616..ff4abfc
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@@ -18,10 -18,12 +18,12 @@@
  package org.apache.cassandra.io.sstable;
  
  import java.io.File;
+ import java.io.IOError;
+ import java.io.IOException;
 -import java.util.ArrayDeque;
 -import java.util.Deque;
 -import java.util.StringTokenizer;
 +import java.util.*;
 +import java.util.regex.Pattern;
  
 +import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.CharMatcher;
  import com.google.common.base.Objects;
  
@@@ -46,7 -47,20 +48,9 @@@ import static org.apache.cassandra.io.s
   */
  public class Descriptor
  {
 -
 -    public static enum Type
 -    {
 -        TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
 -        public final boolean isTemporary;
 -        public final String marker;
 -        Type(String marker, boolean isTemporary)
 -        {
 -            this.isTemporary = isTemporary;
 -            this.marker = marker;
 -        }
 -    }
 +    public static String TMP_EXT = ".tmp";
+ 
+     /** canonicalized path to the directory where SSTable resides */
      public final File directory;
      /** version has the following format: <code>[a-z]+</code> */
      public final Version version;
@@@ -89,10 -104,10 +100,10 @@@
          this.ksname = ksname;
          this.cfname = cfname;
          this.generation = generation;
 -        this.type = temp;
          this.formatType = formatType;
 +        this.digestComponent = digestComponent;
  
-         hashCode = Objects.hashCode(version, directory, generation, ksname, 
cfname, formatType);
 -        hashCode = Objects.hashCode(version, this.directory, generation, 
ksname, cfname, temp, formatType);
++        hashCode = Objects.hashCode(version, this.directory, generation, 
ksname, cfname, formatType);
      }
  
      public Descriptor withGeneration(int newGeneration)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 4f2fc73,0000000..45b5844
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@@ -1,1183 -1,0 +1,1214 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
++import java.io.IOError;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +import java.nio.file.Files;
 +import java.util.*;
 +import java.util.function.BiConsumer;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import static junit.framework.Assert.assertNotNull;
 +import static junit.framework.Assert.assertNull;
 +import static junit.framework.Assert.fail;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +import junit.framework.Assert;
 +import org.apache.cassandra.MockSchema;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.compaction.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.BufferedSegmentedFile;
 +import org.apache.cassandra.io.util.ChannelProxy;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.utils.AlwaysPresentFilter;
 +import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +public class LogTransactionTest extends AbstractTransactionalTest
 +{
 +    private static final String KEYSPACE = "TransactionLogsTest";
 +
 +    @BeforeClass
 +    public static void setUp()
 +    {
 +        MockSchema.cleanup();
 +    }
 +
 +    protected AbstractTransactionalTest.TestableTransaction newTest() throws 
Exception
 +    {
 +        LogTransaction.waitForDeletions();
 +        SSTableReader.resetTidying();
 +        return new TxnTest();
 +    }
 +
 +    private static final class TxnTest extends TestableTransaction
 +    {
 +        private final static class Transaction extends 
Transactional.AbstractTransactional implements Transactional
 +        {
 +            final ColumnFamilyStore cfs;
 +            final LogTransaction txnLogs;
 +            final File dataFolder;
 +            final SSTableReader sstableOld;
 +            final SSTableReader sstableNew;
 +            final LogTransaction.SSTableTidier tidier;
 +
 +            Transaction(ColumnFamilyStore cfs, LogTransaction txnLogs) throws 
IOException
 +            {
 +                this.cfs = cfs;
 +                this.txnLogs = txnLogs;
 +                this.dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +                this.sstableOld = sstable(dataFolder, cfs, 0, 128);
 +                this.sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +                assertNotNull(txnLogs);
 +                assertNotNull(txnLogs.id());
 +                Assert.assertEquals(OperationType.COMPACTION, txnLogs.type());
 +
 +                txnLogs.trackNew(sstableNew);
 +                tidier = txnLogs.obsoleted(sstableOld);
 +                assertNotNull(tidier);
 +            }
 +
 +            protected Throwable doCommit(Throwable accumulate)
 +            {
 +                sstableOld.markObsolete(tidier);
 +                sstableOld.selfRef().release();
 +                LogTransaction.waitForDeletions();
 +
 +                Throwable ret = txnLogs.commit(accumulate);
 +
 +                sstableNew.selfRef().release();
 +                return ret;
 +            }
 +
 +            protected Throwable doAbort(Throwable accumulate)
 +            {
 +                tidier.abort();
 +                LogTransaction.waitForDeletions();
 +
 +                Throwable ret = txnLogs.abort(accumulate);
 +
 +                sstableNew.selfRef().release();
 +                sstableOld.selfRef().release();
 +                return ret;
 +            }
 +
 +            protected void doPrepare()
 +            {
 +                txnLogs.prepareToCommit();
 +            }
 +
 +            void assertInProgress() throws Exception
 +            {
 +                assertFiles(dataFolder.getPath(), 
Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
 +                                                                              
     sstableOld.getAllFilePaths(),
 +                                                                              
     txnLogs.logFilePaths())));
 +            }
 +
 +            void assertPrepared() throws Exception
 +            {
 +            }
 +
 +            void assertAborted() throws Exception
 +            {
 +                assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableOld.getAllFilePaths()));
 +            }
 +
 +            void assertCommitted() throws Exception
 +            {
 +                assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableNew.getAllFilePaths()));
 +            }
 +        }
 +
 +        final Transaction txn;
 +
 +        private TxnTest() throws IOException
 +        {
 +            this(MockSchema.newCFS(KEYSPACE));
 +        }
 +
 +        private TxnTest(ColumnFamilyStore cfs) throws IOException
 +        {
 +            this(cfs, new LogTransaction(OperationType.COMPACTION));
 +        }
 +
 +        private TxnTest(ColumnFamilyStore cfs, LogTransaction txnLogs) throws 
IOException
 +        {
 +            this(new Transaction(cfs, txnLogs));
 +        }
 +
 +        private TxnTest(Transaction txn)
 +        {
 +            super(txn);
 +            this.txn = txn;
 +        }
 +
 +        protected void assertInProgress() throws Exception
 +        {
 +            txn.assertInProgress();
 +        }
 +
 +        protected void assertPrepared() throws Exception
 +        {
 +            txn.assertPrepared();
 +        }
 +
 +        protected void assertAborted() throws Exception
 +        {
 +            txn.assertAborted();
 +        }
 +
 +        protected void assertCommitted() throws Exception
 +        {
 +            txn.assertCommitted();
 +        }
 +    }
 +
 +    @Test
 +    public void testUntrack() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +        // complete a transaction without keep the new files since they were 
untracked
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstableNew);
 +        log.untrackNew(sstableNew);
 +
 +        log.finish();
 +
 +        sstableNew.selfRef().release();
 +        Thread.sleep(1);
 +        LogTransaction.waitForDeletions();
 +
 +        assertFiles(dataFolder.getPath(), Collections.<String>emptySet());
 +    }
 +
 +    @Test
 +    public void testCommitSameDesc() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstableOld1 = sstable(dataFolder, cfs, 0, 128);
 +        SSTableReader sstableOld2 = sstable(dataFolder, cfs, 0, 256);
 +        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstableNew);
 +
 +        sstableOld1.setReplaced();
 +
 +        LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld2);
 +        assertNotNull(tidier);
 +
 +        log.finish();
 +
 +        sstableOld2.markObsolete(tidier);
 +
 +        sstableOld1.selfRef().release();
 +        sstableOld2.selfRef().release();
 +
 +        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableNew.getAllFilePaths()));
 +
 +        sstableNew.selfRef().release();
 +    }
 +
 +    @Test
 +    public void testCommitOnlyNew() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstable);
 +        log.finish();
 +
 +        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstable.getAllFilePaths()));
 +
 +        sstable.selfRef().release();
 +    }
 +
 +    @Test
 +    public void testCommitOnlyOld() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier tidier = log.obsoleted(sstable);
 +        assertNotNull(tidier);
 +
 +        log.finish();
 +        sstable.markObsolete(tidier);
 +        sstable.selfRef().release();
 +
 +        assertFiles(dataFolder.getPath(), new HashSet<>());
 +    }
 +
 +    @Test
 +    public void testCommitMultipleFolders() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +
 +        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        File dataFolder1 = new File(origiFolder, "1");
 +        File dataFolder2 = new File(origiFolder, "2");
 +        Files.createDirectories(dataFolder1.toPath());
 +        Files.createDirectories(dataFolder2.toPath());
 +
 +        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
 +                                     sstable(dataFolder1, cfs, 1, 128),
 +                                     sstable(dataFolder2, cfs, 2, 128),
 +                                     sstable(dataFolder2, cfs, 3, 128)
 +        };
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier[] tidiers = { 
log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) };
 +
 +        log.trackNew(sstables[1]);
 +        log.trackNew(sstables[3]);
 +
 +        log.finish();
 +
 +        sstables[0].markObsolete(tidiers[0]);
 +        sstables[2].markObsolete(tidiers[1]);
 +
 +        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
 +        LogTransaction.waitForDeletions();
 +
 +        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[1].getAllFilePaths()));
 +        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[3].getAllFilePaths()));
 +    }
 +
 +    @Test
 +    public void testAbortOnlyNew() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstable);
 +        log.abort();
 +
 +        sstable.selfRef().release();
 +
 +        assertFiles(dataFolder.getPath(), new HashSet<>());
 +    }
 +
 +    @Test
 +    public void testAbortOnlyOld() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier tidier = log.obsoleted(sstable);
 +        assertNotNull(tidier);
 +
 +        tidier.abort();
 +        log.abort();
 +
 +        sstable.selfRef().release();
 +
 +        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstable.getAllFilePaths()));
 +    }
 +
 +    @Test
 +    public void testAbortMultipleFolders() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +
 +        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        File dataFolder1 = new File(origiFolder, "1");
 +        File dataFolder2 = new File(origiFolder, "2");
 +        Files.createDirectories(dataFolder1.toPath());
 +        Files.createDirectories(dataFolder2.toPath());
 +
 +        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
 +                                     sstable(dataFolder1, cfs, 1, 128),
 +                                     sstable(dataFolder2, cfs, 2, 128),
 +                                     sstable(dataFolder2, cfs, 3, 128)
 +        };
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier[] tidiers = { 
log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) };
 +
 +        log.trackNew(sstables[1]);
 +        log.trackNew(sstables[3]);
 +
 +        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::abort);
 +        log.abort();
 +
 +        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
 +        LogTransaction.waitForDeletions();
 +
 +        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[0].getAllFilePaths()));
 +        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[2].getAllFilePaths()));
 +    }
 +
 +
 +    @Test
 +    public void testRemoveUnfinishedLeftovers_abort() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
 +        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +        // simulate tracking sstables with a failed transaction (new log file 
NOT deleted)
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstableNew);
 +        LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld);
 +
 +        Set<File> tmpFiles = 
sstableNew.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet());
 +
 +        sstableNew.selfRef().release();
 +        sstableOld.selfRef().release();
 +
-         Assert.assertEquals(tmpFiles, 
LogAwareFileLister.getTemporaryFiles(sstableNew.descriptor.directory));
++        Assert.assertEquals(tmpFiles, 
getTemporaryFiles(sstableNew.descriptor.directory));
 +
 +        // normally called at startup
 +        LogTransaction.removeUnfinishedLeftovers(cfs.metadata);
 +
 +        // sstableOld should be only table left
 +        Directories directories = new Directories(cfs.metadata);
 +        Map<Descriptor, Set<Component>> sstables = 
directories.sstableLister(Directories.OnTxnErr.THROW).list();
 +        assertEquals(1, sstables.size());
 +
 +        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableOld.getAllFilePaths()));
 +
 +        // complete the transaction before releasing files
 +        tidier.run();
 +        log.close();
 +    }
 +
 +    @Test
 +    public void testRemoveUnfinishedLeftovers_commit() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
 +        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +        // simulate tracking sstables with a committed transaction (new log 
file deleted)
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstableNew);
 +        LogTransaction.SSTableTidier tidier = log.obsoleted(sstableOld);
 +
 +        //Fake a commit
 +        log.txnFile().commit();
 +
 +        Set<File> tmpFiles = 
sstableOld.getAllFilePaths().stream().map(File::new).collect(Collectors.toSet());
 +
 +        sstableNew.selfRef().release();
 +        sstableOld.selfRef().release();
 +
-         Assert.assertEquals(tmpFiles, 
LogAwareFileLister.getTemporaryFiles(sstableOld.descriptor.directory));
++        Assert.assertEquals(tmpFiles, 
getTemporaryFiles(sstableOld.descriptor.directory));
 +
 +        // normally called at startup
 +        LogTransaction.removeUnfinishedLeftovers(cfs.metadata);
 +
 +        // sstableNew should be only table left
 +        Directories directories = new Directories(cfs.metadata);
 +        Map<Descriptor, Set<Component>> sstables = 
directories.sstableLister(Directories.OnTxnErr.THROW).list();
 +        assertEquals(1, sstables.size());
 +
 +        assertFiles(dataFolder.getPath(), new 
HashSet<>(sstableNew.getAllFilePaths()));
 +
 +        // complete the transaction to avoid LEAK errors
 +        tidier.run();
 +        assertNull(log.complete(null));
 +    }
 +
 +    @Test
 +    public void testRemoveUnfinishedLeftovers_commit_multipleFolders() throws 
Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +
 +        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        File dataFolder1 = new File(origiFolder, "1");
 +        File dataFolder2 = new File(origiFolder, "2");
 +        Files.createDirectories(dataFolder1.toPath());
 +        Files.createDirectories(dataFolder2.toPath());
 +
 +        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
 +                                     sstable(dataFolder1, cfs, 1, 128),
 +                                     sstable(dataFolder2, cfs, 2, 128),
 +                                     sstable(dataFolder2, cfs, 3, 128)
 +        };
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier[] tidiers = { 
log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) };
 +
 +        log.trackNew(sstables[1]);
 +        log.trackNew(sstables[3]);
 +
 +        Collection<File> logFiles = log.logFiles();
 +        Assert.assertEquals(2, logFiles.size());
 +
 +        // fake a commit
 +        log.txnFile().commit();
 +
 +        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
 +
 +        // test listing
 +        
Assert.assertEquals(sstables[0].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
-                             
LogAwareFileLister.getTemporaryFiles(dataFolder1));
++                            getTemporaryFiles(dataFolder1));
 +        
Assert.assertEquals(sstables[2].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
-                             
LogAwareFileLister.getTemporaryFiles(dataFolder2));
++                            getTemporaryFiles(dataFolder2));
 +
 +        // normally called at startup
 +        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, 
dataFolder2));
 +
 +        // new tables should be only table left
 +        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[1].getAllFilePaths()));
 +        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[3].getAllFilePaths()));
 +
 +        // complete the transaction to avoid LEAK errors
 +        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run);
 +        assertNull(log.complete(null));
 +    }
 +
 +    @Test
 +    public void testRemoveUnfinishedLeftovers_abort_multipleFolders() throws 
Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +
 +        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        File dataFolder1 = new File(origiFolder, "1");
 +        File dataFolder2 = new File(origiFolder, "2");
 +        Files.createDirectories(dataFolder1.toPath());
 +        Files.createDirectories(dataFolder2.toPath());
 +
 +        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
 +                                     sstable(dataFolder1, cfs, 1, 128),
 +                                     sstable(dataFolder2, cfs, 2, 128),
 +                                     sstable(dataFolder2, cfs, 3, 128)
 +        };
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier[] tidiers = { 
log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) };
 +
 +        log.trackNew(sstables[1]);
 +        log.trackNew(sstables[3]);
 +
 +        Collection<File> logFiles = log.logFiles();
 +        Assert.assertEquals(2, logFiles.size());
 +
 +        // fake an abort
 +        log.txnFile().abort();
 +
 +        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
 +
 +        // test listing
 +        
Assert.assertEquals(sstables[1].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
-                             
LogAwareFileLister.getTemporaryFiles(dataFolder1));
++                            getTemporaryFiles(dataFolder1));
 +        
Assert.assertEquals(sstables[3].getAllFilePaths().stream().map(File::new).collect(Collectors.toSet()),
-                             
LogAwareFileLister.getTemporaryFiles(dataFolder2));
++                            getTemporaryFiles(dataFolder2));
 +
 +        // normally called at startup
 +        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, 
dataFolder2));
 +
 +        // old tables should be only table left
 +        assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[0].getAllFilePaths()));
 +        assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[2].getAllFilePaths()));
 +
 +        // complete the transaction to avoid LEAK errors
 +        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run);
 +        assertNull(log.complete(null));
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_mismatchedFinalRecords() throws 
Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert mismatched records
 +            FileUtils.append(logFiles.get(0), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
 +            FileUtils.append(logFiles.get(1), 
LogRecord.makeAbort(System.currentTimeMillis()).raw);
 +
 +        }, false);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_first() 
throws Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert a full record and a partial one
 +            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
 +            int toChop = finalRecord.length() / 2;
 +            FileUtils.append(logFiles.get(0), finalRecord.substring(0, 
finalRecord.length() - toChop));
 +            FileUtils.append(logFiles.get(1), finalRecord);
 +
 +        }, true);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialFinalRecords_second() 
throws Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert a full record and a partial one
 +            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
 +            int toChop = finalRecord.length() / 2;
 +            FileUtils.append(logFiles.get(0), finalRecord);
 +            FileUtils.append(logFiles.get(1), finalRecord.substring(0, 
finalRecord.length() - toChop));
 +
 +        }, true);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_first() 
throws Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert a partial sstable record and a full commit record
 +            String sstableRecord = LogRecord.make(LogRecord.Type.ADD, 
Collections.emptyList(), 0, "abc").raw;
 +            int toChop = sstableRecord.length() / 2;
 +            FileUtils.append(logFiles.get(0), sstableRecord.substring(0, 
sstableRecord.length() - toChop));
 +            FileUtils.append(logFiles.get(1), sstableRecord);
 +            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
 +            FileUtils.append(logFiles.get(0), finalRecord);
 +            FileUtils.append(logFiles.get(1), finalRecord);
 +
 +        }, false);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_partialNonFinalRecord_second() 
throws Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert a partial sstable record and a full commit record
 +            String sstableRecord = LogRecord.make(LogRecord.Type.ADD, 
Collections.emptyList(), 0, "abc").raw;
 +            int toChop = sstableRecord.length() / 2;
 +            FileUtils.append(logFiles.get(0), sstableRecord);
 +            FileUtils.append(logFiles.get(1), sstableRecord.substring(0, 
sstableRecord.length() - toChop));
 +            String finalRecord = 
LogRecord.makeCommit(System.currentTimeMillis()).raw;
 +            FileUtils.append(logFiles.get(0), finalRecord);
 +            FileUtils.append(logFiles.get(1), finalRecord);
 +
 +        }, false);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_first() 
throws Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert only one commit record
 +            FileUtils.append(logFiles.get(0), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
 +
 +        }, true);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_missingFinalRecords_second() 
throws Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert only one commit record
 +            FileUtils.append(logFiles.get(1), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
 +
 +        }, true);
 +    }
 +
 +    @Test
 +    public void 
testRemoveUnfinishedLeftovers_multipleFolders_tooManyFinalRecords() throws 
Throwable
 +    {
 +        testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(txn -> {
 +            List<File> logFiles = txn.logFiles();
 +            Assert.assertEquals(2, logFiles.size());
 +
 +            // insert mismatched records
 +            FileUtils.append(logFiles.get(0), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
 +            FileUtils.append(logFiles.get(1), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
 +            FileUtils.append(logFiles.get(1), 
LogRecord.makeCommit(System.currentTimeMillis()).raw);
 +
 +        }, false);
 +    }
 +
 +    private static void 
testRemoveUnfinishedLeftovers_multipleFolders_errorConditions(Consumer<LogTransaction>
 modifier, boolean shouldCommit) throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +
 +        File origiFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        File dataFolder1 = new File(origiFolder, "1");
 +        File dataFolder2 = new File(origiFolder, "2");
 +        Files.createDirectories(dataFolder1.toPath());
 +        Files.createDirectories(dataFolder2.toPath());
 +
 +        SSTableReader[] sstables = { sstable(dataFolder1, cfs, 0, 128),
 +                                     sstable(dataFolder1, cfs, 1, 128),
 +                                     sstable(dataFolder2, cfs, 2, 128),
 +                                     sstable(dataFolder2, cfs, 3, 128)
 +        };
 +
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        LogTransaction.SSTableTidier[] tidiers = { 
log.obsoleted(sstables[0]), log.obsoleted(sstables[2]) };
 +
 +        log.trackNew(sstables[1]);
 +        log.trackNew(sstables[3]);
 +
 +        // fake some error condition on the txn logs
 +        modifier.accept(log);
 +
 +        Arrays.stream(sstables).forEach(s -> s.selfRef().release());
 +
 +        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, 
dataFolder2));
 +        LogTransaction.waitForDeletions();
 +
 +        if (shouldCommit)
 +        {
 +            // only new sstables should still be there
 +            assertFiles(dataFolder1.getPath(), new 
HashSet<>(sstables[1].getAllFilePaths()));
 +            assertFiles(dataFolder2.getPath(), new 
HashSet<>(sstables[3].getAllFilePaths()));
 +        }
 +        else
 +        {
 +            // all files should still be there
 +            assertFiles(dataFolder1.getPath(), 
Sets.newHashSet(Iterables.concat(sstables[0].getAllFilePaths(),
 +                                                                              
  sstables[1].getAllFilePaths(),
 +                                                                              
  Collections.singleton(log.logFilePaths().get(0)))));
 +            assertFiles(dataFolder2.getPath(), 
Sets.newHashSet(Iterables.concat(sstables[2].getAllFilePaths(),
 +                                                                              
  sstables[3].getAllFilePaths(),
 +                                                                              
  Collections.singleton(log.logFilePaths().get(1)))));
 +        }
 +
 +
 +        // complete the transaction to avoid LEAK errors
 +        Arrays.stream(tidiers).forEach(LogTransaction.SSTableTidier::run);
 +        log.txnFile().commit(); // just anything to make sure transaction 
tidier will finish
 +        assertNull(log.complete(null));
 +    }
 +
 +    @Test
 +    public void testGetTemporaryFiles() throws IOException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable1 = sstable(dataFolder, cfs, 0, 128);
 +
-         Set<File> tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder);
++        Set<File> tmpFiles = getTemporaryFiles(dataFolder);
 +        assertNotNull(tmpFiles);
 +        assertEquals(0, tmpFiles.size());
 +
 +        try(LogTransaction log = new LogTransaction(OperationType.WRITE))
 +        {
 +            Directories directories = new Directories(cfs.metadata);
 +
 +            File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> 
!pathname.isDirectory());
 +
 +            SSTableReader sstable2 = sstable(dataFolder, cfs, 1, 128);
 +            log.trackNew(sstable2);
 +
 +            Map<Descriptor, Set<Component>> sstables = 
directories.sstableLister(Directories.OnTxnErr.THROW).list();
 +            assertEquals(2, sstables.size());
 +
 +            // this should contain sstable1, sstable2 and the transaction log 
file
 +            File[] afterSecondSSTable = dataFolder.listFiles(pathname -> 
!pathname.isDirectory());
 +
 +            int numNewFiles = afterSecondSSTable.length - 
beforeSecondSSTable.length;
 +            assertEquals(numNewFiles - 1, sstable2.getAllFilePaths().size()); 
// new files except for transaction log file
 +
-             tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder);
++            tmpFiles = getTemporaryFiles(dataFolder);
 +            assertNotNull(tmpFiles);
 +            assertEquals(numNewFiles - 1, tmpFiles.size());
 +
 +            File ssTable2DataFile = new 
File(sstable2.descriptor.filenameFor(Component.DATA));
 +            File ssTable2IndexFile = new 
File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +
 +            assertTrue(tmpFiles.contains(ssTable2DataFile));
 +            assertTrue(tmpFiles.contains(ssTable2IndexFile));
 +
 +            List<File> files = 
directories.sstableLister(Directories.OnTxnErr.THROW).listFiles();
 +            List<File> filesNoTmp = 
directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles();
 +            assertNotNull(files);
 +            assertNotNull(filesNoTmp);
 +
 +            assertTrue(files.contains(ssTable2DataFile));
 +            assertTrue(files.contains(ssTable2IndexFile));
 +
 +            assertFalse(filesNoTmp.contains(ssTable2DataFile));
 +            assertFalse(filesNoTmp.contains(ssTable2IndexFile));
 +
 +            log.finish();
 +
 +            //Now it should be empty since the transaction has finished
-             tmpFiles = LogAwareFileLister.getTemporaryFiles(dataFolder);
++            tmpFiles = getTemporaryFiles(dataFolder);
 +            assertNotNull(tmpFiles);
 +            assertEquals(0, tmpFiles.size());
 +
 +            filesNoTmp = 
directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles();
 +            assertNotNull(filesNoTmp);
 +            assertTrue(filesNoTmp.contains(ssTable2DataFile));
 +            assertTrue(filesNoTmp.contains(ssTable2IndexFile));
 +
 +            sstable1.selfRef().release();
 +            sstable2.selfRef().release();
 +        }
 +    }
 +
 +    @Test
 +    public void testWrongChecksumLastLine() throws IOException
 +    {
 +        testCorruptRecord((t, s) ->
 +                          { // Fake a commit with invalid checksum
 +                              long now = System.currentTimeMillis();
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0][%d]", now, 12345678L)));
 +                          },
 +                          true);
 +    }
 +
 +    @Test
 +    public void testWrongChecksumSecondFromLastLine() throws IOException
 +    {
 +        testCorruptRecord((t, s) ->
 +                          { // Fake two lines with invalid checksum
 +                              long now = System.currentTimeMillis();
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("add:[ma-3-big,%d,4][%d]", now, 12345678L)));
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0][%d]", now, 12345678L)));
 +                          },
 +                          false);
 +    }
 +
 +    @Test
 +    public void testWrongChecksumLastLineMissingFile() throws IOException
 +    {
 +        testCorruptRecord((t, s) ->
 +                          { // Fake a commit with invalid checksum and also 
delete one of the old files
 +                              for (String filePath : s.getAllFilePaths())
 +                              {
 +                                  if (filePath.endsWith("Data.db"))
 +                                  {
 +                                      assertTrue(FileUtils.delete(filePath));
 +                                      
assertNull(t.txnFile().syncFolder(null));
 +                                      break;
 +                                  }
 +                              }
 +
 +                              long now = System.currentTimeMillis();
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0][%d]", now, 12345678L)));
 +                          },
 +                          false);
 +    }
 +
 +    @Test
 +    public void testWrongChecksumLastLineWrongRecordFormat() throws 
IOException
 +    {
 +        testCorruptRecord((t, s) ->
 +                          { // Fake a commit with invalid checksum and a 
wrong record format (extra spaces)
 +                              long now = System.currentTimeMillis();
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d ,0 ,0 ][%d]", now, 12345678L)));
 +                          },
 +                          true);
 +    }
 +
 +    @Test
 +    public void testMissingChecksumLastLine() throws IOException
 +    {
 +        testCorruptRecord((t, s) ->
 +                          {
 +                              // Fake a commit without a checksum
 +                              long now = System.currentTimeMillis();
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0]", now)));
 +                          },
 +                          true);
 +    }
 +
 +    @Test
 +    public void testMissingChecksumSecondFromLastLine() throws IOException
 +    {
 +        testCorruptRecord((t, s) ->
 +                          { // Fake two lines without a checksum
 +                              long now = System.currentTimeMillis();
 +                              t.logFiles().forEach( f -> FileUtils.append(f, 
String.format("add:[ma-3-big,%d,4]", now)));
 +                              t.logFiles().forEach(f -> FileUtils.append(f, 
String.format("commit:[%d,0,0]", now)));
 +                          },
 +                          false);
 +    }
 +
 +    @Test
 +    public void testUnparsableLastRecord() throws IOException
 +    {
 +        testCorruptRecord((t, s) -> t.logFiles().forEach(f -> 
FileUtils.append(f, "commit:[a,b,c][12345678]")), true);
 +    }
 +
 +    @Test
 +    public void testUnparsableFirstRecord() throws IOException
 +    {
 +        testCorruptRecord((t, s) -> t.logFiles().forEach(f -> {
 +            List<String> lines = FileUtils.readLines(f);
 +            lines.add(0, "add:[a,b,c][12345678]");
 +            FileUtils.replace(f, lines.toArray(new String[lines.size()]));
 +        }), false);
 +    }
 +
 +    private static void testCorruptRecord(BiConsumer<LogTransaction, 
SSTableReader> modifier, boolean isRecoverable) throws IOException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
 +        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +        // simulate tracking sstables with a committed transaction except the 
checksum will be wrong
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstableNew);
 +        log.obsoleted(sstableOld);
 +
 +        // Modify the transaction log or disk state for sstableOld
 +        modifier.accept(log, sstableOld);
 +
 +        assertNull(log.complete(null));
 +
 +        sstableOld.selfRef().release();
 +        sstableNew.selfRef().release();
 +
 +        // The files on disk, for old files make sure to exclude the files 
that were deleted by the modifier
 +        Set<String> newFiles = 
sstableNew.getAllFilePaths().stream().collect(Collectors.toSet());
 +        Set<String> oldFiles = sstableOld.getAllFilePaths().stream().filter(p 
-> new File(p).exists()).collect(Collectors.toSet());
 +
 +        //This should filter as in progress since the last record is corrupt
-         assertFiles(newFiles, 
LogAwareFileLister.getTemporaryFiles(dataFolder));
-         assertFiles(oldFiles, LogAwareFileLister.getFinalFiles(dataFolder));
++        assertFiles(newFiles, getTemporaryFiles(dataFolder));
++        assertFiles(oldFiles, getFinalFiles(dataFolder));
 +
 +        if (isRecoverable)
 +        { // the corruption is recoverable but the commit record is 
unreadable so the transaction is still in progress
 +
 +            //This should remove new files
 +            LogTransaction.removeUnfinishedLeftovers(cfs.metadata);
 +
 +            // make sure to exclude the old files that were deleted by the 
modifier
 +            assertFiles(dataFolder.getPath(), oldFiles);
 +        }
 +        else
 +        { // if an intermediate line was also modified, it should ignore the 
tx log file
 +
 +            //This should not remove any files
 +            LogTransaction.removeUnfinishedLeftovers(cfs.metadata);
 +
 +            assertFiles(dataFolder.getPath(), 
Sets.newHashSet(Iterables.concat(newFiles,
 +                                                                              
 oldFiles,
 +                                                                              
 log.logFilePaths())));
 +        }
 +    }
 +
 +    @Test
 +    public void testObsoletedDataFileUpdateTimeChanged() throws IOException
 +    {
 +        testObsoletedFilesChanged(sstable ->
 +                                  {
 +                                      // increase the modification time of 
the Data file
 +                                      for (String filePath : 
sstable.getAllFilePaths())
 +                                      {
 +                                          if (filePath.endsWith("Data.db"))
 +                                              assertTrue(new 
File(filePath).setLastModified(System.currentTimeMillis() + 60000)); //one 
minute later
 +                                      }
 +                                  });
 +    }
 +
 +    private static void testObsoletedFilesChanged(Consumer<SSTableReader> 
modifier) throws IOException
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstableOld = sstable(dataFolder, cfs, 0, 128);
 +        SSTableReader sstableNew = sstable(dataFolder, cfs, 1, 128);
 +
 +        // simulate tracking sstables with a committed transaction except the 
checksum will be wrong
 +        LogTransaction log = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(log);
 +
 +        log.trackNew(sstableNew);
 +        /*TransactionLog.SSTableTidier tidier =*/ log.obsoleted(sstableOld);
 +
 +        //modify the old sstable files
 +        modifier.accept(sstableOld);
 +
 +        //Fake a commit
 +        log.txnFile().commit();
 +
 +        //This should not remove the old files
 +        LogTransaction.removeUnfinishedLeftovers(cfs.metadata);
 +
 +        assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(
 +                                                                          
sstableNew.getAllFilePaths(),
 +                                                                          
sstableOld.getAllFilePaths(),
 +                                                                          
log.logFilePaths())));
 +
 +        sstableOld.selfRef().release();
 +        sstableNew.selfRef().release();
 +
 +        // complete the transaction to avoid LEAK errors
 +        assertNull(log.complete(null));
 +
 +        assertFiles(dataFolder.getPath(), 
Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
 +                                                                           
sstableOld.getAllFilePaths(),
 +                                                                           
log.logFilePaths())));
 +    }
 +
 +    @Test
 +    public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 +
 +        LogTransaction logs = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(logs);
 +
 +        LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable);
 +
 +        logs.finish();
 +
 +        sstable.markObsolete(tidier);
 +        sstable.selfRef().release();
 +
 +        // This should race with the asynchronous deletion of txn log files
 +        // It doesn't matter what it returns but it should not throw because 
the txn
 +        // was completed before deleting files (i.e. releasing sstables)
 +        for (int i = 0; i < 200; i++)
-             LogAwareFileLister.getTemporaryFiles(dataFolder);
++            getTemporaryFiles(dataFolder);
 +    }
 +
 +    @Test
 +    public void testGetTemporaryFilesThrowsIfCompletingAfterObsoletion() 
throws Throwable
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
 +        File dataFolder = new 
Directories(cfs.metadata).getDirectoryForNewSSTables();
 +        SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
 +
 +        LogTransaction logs = new LogTransaction(OperationType.COMPACTION);
 +        assertNotNull(logs);
 +
 +        LogTransaction.SSTableTidier tidier = logs.obsoleted(sstable);
 +
 +        sstable.markObsolete(tidier);
 +        sstable.selfRef().release();
 +
 +        LogTransaction.waitForDeletions();
 +
 +        try
 +        {
 +            // This should race with the asynchronous deletion of txn log 
files
 +            // it should throw because we are violating the requirement that 
a transaction must
 +            // finish before deleting files (i.e. releasing sstables)
-             LogAwareFileLister.getTemporaryFiles(dataFolder);
++            getTemporaryFiles(dataFolder);
 +            fail("Expected runtime exception");
 +        }
 +        catch(RuntimeException e)
 +        {
 +            //pass as long as the cause is not an assertion
 +            assertFalse(e.getCause() instanceof AssertionError);
 +        }
 +
 +        logs.finish();
 +    }
 +
 +    private static SSTableReader sstable(File dataFolder, ColumnFamilyStore 
cfs, int generation, int size) throws IOException
 +    {
 +        Descriptor descriptor = new Descriptor(dataFolder, 
cfs.keyspace.getName(), cfs.getTableName(), generation);
 +        Set<Component> components = ImmutableSet.of(Component.DATA, 
Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
 +        for (Component component : components)
 +        {
 +            File file = new File(descriptor.filenameFor(component));
 +            if (!file.exists())
 +                assertTrue(file.createNewFile());
 +            try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
 +            {
 +                raf.setLength(size);
 +            }
 +        }
 +
 +        SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new 
File(descriptor.filenameFor(Component.DATA))), 
RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
 +        SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 
RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
 +
 +        SerializationHeader header = SerializationHeader.make(cfs.metadata, 
Collections.emptyList());
 +        StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata.comparator)
 +                                                 
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 
0.01f, -1, header)
 +                                                 .get(MetadataType.STATS);
 +        SSTableReader reader = SSTableReader.internalOpen(descriptor,
 +                                                          components,
 +                                                          cfs.metadata,
 +                                                          dFile,
 +                                                          iFile,
 +                                                          
MockSchema.indexSummary.sharedCopy(),
 +                                                          new 
AlwaysPresentFilter(),
 +                                                          1L,
 +                                                          metadata,
 +                                                          
SSTableReader.OpenReason.NORMAL,
 +                                                          header);
 +        reader.first = reader.last = MockSchema.readerBounds(generation);
 +        return reader;
 +    }
 +
-     private static void assertFiles(String dirPath, Set<String> expectedFiles)
++    private static void assertFiles(String dirPath, Set<String> 
expectedFiles) throws IOException
 +    {
 +        assertFiles(dirPath, expectedFiles, false);
 +    }
 +
-     private static void assertFiles(String dirPath, Set<String> 
expectedFiles, boolean excludeNonExistingFiles)
++    private static void assertFiles(String dirPath, Set<String> 
expectedFiles, boolean excludeNonExistingFiles) throws IOException
 +    {
 +        LogTransaction.waitForDeletions();
 +
-         File dir = new File(dirPath);
++        File dir = new File(dirPath).getCanonicalFile();
 +        File[] files = dir.listFiles();
 +        if (files != null)
 +        {
 +            for (File file : files)
 +            {
 +                if (file.isDirectory())
 +                    continue;
 +
 +                String filePath = file.getPath();
 +                assertTrue(String.format("%s not in [%s]", filePath, 
expectedFiles), expectedFiles.contains(filePath));
 +                expectedFiles.remove(filePath);
 +            }
 +        }
 +
 +        if (excludeNonExistingFiles)
 +        {
 +            for (String filePath : expectedFiles)
 +            {
 +                File file = new File(filePath);
 +                if (!file.exists())
 +                    expectedFiles.remove(filePath);
 +            }
 +        }
 +
 +        assertTrue(expectedFiles.toString(), expectedFiles.isEmpty());
 +    }
 +
 +    // Check either that a temporary file is expected to exist (in the 
existingFiles) or that
 +    // it does not exist any longer (on Windows we need to check 
File.exists() because a list
 +    // might return a file as existing even if it does not)
 +    private static void assertFiles(Iterable<String> existingFiles, Set<File> 
temporaryFiles)
 +    {
 +        for (String filePath : existingFiles)
 +        {
 +            File file = new File(filePath);
 +            assertTrue(filePath, temporaryFiles.contains(file));
 +            temporaryFiles.remove(file);
 +        }
 +
 +        for (File file : temporaryFiles)
 +        {
 +            if (!file.exists())
 +                temporaryFiles.remove(file);
 +        }
 +
 +        assertTrue(temporaryFiles.toString(), temporaryFiles.isEmpty());
 +    }
++
++    static Set<File> getTemporaryFiles(File folder)
++    {
++        return listFiles(folder, Directories.FileType.TEMPORARY);
++    }
++
++    static Set<File> getFinalFiles(File folder)
++    {
++        return listFiles(folder, Directories.FileType.FINAL);
++    }
++
++    static Set<File> listFiles(File folder, Directories.FileType... types)
++    {
++        Collection<Directories.FileType> match = Arrays.asList(types);
++        return new LogAwareFileLister(folder.toPath(),
++                                      (file, type) -> match.contains(type),
++                                      Directories.OnTxnErr.IGNORE).list()
++                       .stream()
++                       .map(f -> {
++                           try
++                           {
++                               return f.getCanonicalFile();
++                           }
++                           catch (IOException e)
++                           {
++                               throw new IOError(e);
++                           }
++                       })
++                       .collect(Collectors.toSet());
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/15131391/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index 19eca40,6354fc2..184d637
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@@ -116,18 -127,33 +115,33 @@@ public class DescriptorTes
      }
  
      @Test
-     public void validateNames()
+     public void testEquality()
      {
+         // Descriptor should be equal when parent directory points to the 
same directory
+         File dir = new File(".");
 -        Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1, 
Descriptor.Type.FINAL);
 -        Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 
1, Descriptor.Type.FINAL);
++        Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1);
++        Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 
1);
+         assertEquals(desc1, desc2);
+         assertEquals(desc1.hashCode(), desc2.hashCode());
+     }
  
-         String names[] = {
-             /*"system-schema_keyspaces-ka-1-CompressionInfo.db",  
"system-schema_keyspaces-ka-1-Summary.db",
-             "system-schema_keyspaces-ka-1-Data.db",             
"system-schema_keyspaces-ka-1-TOC.txt",
-             "system-schema_keyspaces-ka-1-Digest.sha1",         
"system-schema_keyspaces-ka-2-CompressionInfo.db",
-             "system-schema_keyspaces-ka-1-Filter.db",           
"system-schema_keyspaces-ka-2-Data.db",
-             "system-schema_keyspaces-ka-1-Index.db",            
"system-schema_keyspaces-ka-2-Digest.sha1",
-             "system-schema_keyspaces-ka-1-Statistics.db",
-             "system-schema_keyspacest-tmp-ka-1-Data.db",*/
-             "system-schema_keyspace-ka-1-"+ 
SSTableFormat.Type.BIG.name+"-Data.db"
+     @Test
+     public void validateNames()
+     {
 -
++        // TODO tmp file name probably is not handled correctly after 
CASSANDRA-7066
+         String[] names = {
+              // old formats
+              "system-schema_keyspaces-jb-1-Data.db",
 -             "system-schema_keyspaces-tmp-jb-1-Data.db",
++             //"system-schema_keyspaces-tmp-jb-1-Data.db",
+              "system-schema_keyspaces-ka-1-big-Data.db",
 -             "system-schema_keyspaces-tmp-ka-1-big-Data.db",
++             //"system-schema_keyspaces-tmp-ka-1-big-Data.db",
+              // 2ndary index
+              "keyspace1-standard1.idx1-ka-1-big-Data.db",
+              // new formats
+              "la-1-big-Data.db",
 -             "tmp-la-1-big-Data.db",
++             //"tmp-la-1-big-Data.db",
+              // 2ndary index
+              ".idx1" + File.separator + "la-1-big-Data.db",
          };
  
          for (String name : names)

Reply via email to