Repository: cassandra Updated Branches: refs/heads/trunk 999ce832d -> 32b0a4e95
Handle abort() properly in SSTableRewriter Patch by marcuse; reviewed by jmckenzie for CASSANDRA-8320 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1062929 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1062929 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1062929 Branch: refs/heads/trunk Commit: b1062929185690567e4567e0e657b361c5105482 Parents: 3faff8b Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Nov 18 07:07:30 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Nov 26 16:00:51 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableReader.java | 22 +++ .../cassandra/io/sstable/SSTableRewriter.java | 74 ++++++-- .../io/sstable/SSTableRewriterTest.java | 180 +++++++++++++++---- 4 files changed, 226 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f022b19..e5f7c28 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320) * Fix high size calculations for prepared statements (CASSANDRA-8231) * Centralize shared executors (CASSANDRA-8055) * Fix filtering for CONTAINS (KEY) relations on frozen collection http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index a3e3cf5..1fe4330 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -202,6 +202,7 @@ public class SSTableReader extends SSTable private Object replaceLock = new Object(); private SSTableReader replacedBy; private SSTableReader replaces; + private SSTableReader sharesBfWith; private SSTableDeletingTask deletingTask; private Runnable runOnClose; @@ -594,6 +595,14 @@ public class SSTableReader extends SSTable deleteFiles &= !dfile.path.equals(replaces.dfile.path); } + if (sharesBfWith != null) + { + closeBf &= sharesBfWith.bf != bf; + closeSummary &= sharesBfWith.indexSummary != indexSummary; + closeFiles &= sharesBfWith.dfile != dfile; + deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path); + } + boolean deleteAll = false; if (release && isCompacted.get()) { @@ -928,6 +937,19 @@ public class SSTableReader extends SSTable } } + /** + * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter + * + * note that the reason we don't use replacedBy is that we are not yet actually replaced + * + * @param newReader + */ + public void sharesBfWith(SSTableReader newReader) + { + assert openReason.equals(OpenReason.EARLY); + this.sharesBfWith = newReader; + } + public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) { synchronized (replaceLock) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 4d5a06f..d187e9d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,6 +76,7 @@ public class SSTableRewriter private final ColumnFamilyStore cfs; private final long maxAge; + private final List<SSTableReader> finished = new ArrayList<>(); private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced) private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting @@ -180,16 +182,11 @@ public class SSTableRewriter public void abort() { - if (writer == null) - return; - switchWriter(null); moveStarts(null, Functions.forMap(originalStarts), true); List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly); - if (currentlyOpenedEarly != null) - close.add(currentlyOpenedEarly); for (Pair<SSTableWriter, SSTableReader> w : finishedWriters) { @@ -202,6 +199,12 @@ public class SSTableRewriter for (SSTableReader sstable : close) sstable.markObsolete(); + for (SSTableReader sstable : finished) + { + sstable.markObsolete(); + sstable.releaseReference(); + } + // releases reference in replaceReaders if (!isOffline) { @@ -210,6 +213,7 @@ public class SSTableRewriter } } + /** * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset @@ -327,38 +331,70 @@ public class SSTableRewriter */ public List<SSTableReader> finish(long repairedAt) { - List<SSTableReader> finished = new ArrayList<>(); - if (writer.getFilePointer() > 0) - { - SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt); - finished.add(reader); - replaceEarlyOpenedFile(currentlyOpenedEarly, reader); - moveStarts(reader, Functions.constant(reader.last), false); - } - else - { - writer.abort(true); - } + List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>(); + switchWriter(null); // make real sstables of the written ones: - for (Pair<SSTableWriter, SSTableReader> w : finishedWriters) + Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator(); + while(it.hasNext()) { + Pair<SSTableWriter, SSTableReader> w = it.next(); if (w.left.getFilePointer() > 0) { SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt); finished.add(newReader); + + if (w.right != null) + w.right.sharesBfWith(newReader); // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. - replaceEarlyOpenedFile(w.right, newReader); + toReplace.add(Pair.create(w.right, newReader)); } else { assert w.right == null; w.left.abort(true); } + it.remove(); } + + for (Pair<SSTableReader, SSTableReader> replace : toReplace) + replaceEarlyOpenedFile(replace.left, replace.right); + if (!isOffline) { dataTracker.unmarkCompacting(finished); } return finished; } + + @VisibleForTesting + void finishAndThrow(boolean early) + { + List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>(); + switchWriter(null); + if (early) + throw new RuntimeException("exception thrown early in finish"); + // make real sstables of the written ones: + Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator(); + while(it.hasNext()) + { + Pair<SSTableWriter, SSTableReader> w = it.next(); + if (w.left.getFilePointer() > 0) + { + SSTableReader newReader = w.left.closeAndOpenReader(maxAge); + finished.add(newReader); + if (w.right != null) + w.right.sharesBfWith(newReader); + // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. + toReplace.add(Pair.create(w.right, newReader)); + } + else + { + assert w.right == null; + w.left.abort(true); + } + it.remove(); + } + + throw new RuntimeException("exception thrown after all sstables finished"); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 8a494a6..0a76b66 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -29,9 +30,11 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.compaction.AbstractCompactedRow; @@ -80,12 +83,45 @@ public class SSTableRewriterTest extends SchemaLoader writer.append(row); } } - cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION); - + Collection<SSTableReader> newsstables = writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION); + Thread.sleep(100); validateCFS(cfs); + int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); + assertEquals(1, filecounts); } + @Test + public void basicTest2() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + assertEquals(1, sstables.size()); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) + { + ICompactionScanner scanner = scanners.scanners.get(0); + CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + while (scanner.hasNext()) + { + AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); + writer.append(row); + } + } + Collection<SSTableReader> newsstables = writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); + Thread.sleep(100); + validateCFS(cfs); + int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); + assertEquals(1, filecounts); + } @Test public void testFileRemoval() throws InterruptedException @@ -114,37 +150,11 @@ public class SSTableRewriterTest extends SchemaLoader assertFileCounts(dir.list(), 0, 3); writer.abort(false); Thread.sleep(1000); - assertFileCounts(dir.list(), 0, 0); - validateCFS(cfs); - } - - @Test - public void testFileRemovalNoAbort() throws InterruptedException - { - Keyspace keyspace = Keyspace.open(KEYSPACE); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); - ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); - for (int i = 0; i < 1000; i++) - cf.addColumn(Util.column(String.valueOf(i), "a", 1)); - File dir = cfs.directories.getDirectoryForNewSSTables(); - SSTableWriter writer = getWriter(cfs, dir); - - for (int i = 0; i < 500; i++) - writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); - SSTableReader s = writer.openEarly(1000); - //assertFileCounts(dir.list(), 2, 3); - for (int i = 500; i < 1000; i++) - writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); - writer.closeAndOpenReader(); - s.markObsolete(); - s.releaseReference(); - Thread.sleep(1000); - assertFileCounts(dir.list(), 0, 0); + int datafiles = assertFileCounts(dir.list(), 0, 0); + assertEquals(datafiles, 0); validateCFS(cfs); } - @Test public void testNumberOfFilesAndSizes() throws Exception { @@ -446,6 +456,95 @@ public class SSTableRewriterTest extends SchemaLoader assertFileCounts(s.descriptor.directory.list(), 0, 0); validateCFS(cfs); } + @Test + public void testAbort() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); + SSTableWriter w = getWriter(cfs, s.descriptor.directory); + rewriter.switchWriter(w); + try (ICompactionScanner scanner = compacting.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) + { + while (scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + } + } + try + { + rewriter.finishAndThrow(false); + } + catch (Throwable t) + { + rewriter.abort(); + } + } + Thread.sleep(1000); + int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertEquals(filecount, 1); + assertEquals(1, cfs.getSSTables().size()); + validateCFS(cfs); + cfs.truncateBlocking(); + Thread.sleep(1000); + filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertEquals(0, filecount); + + } + + @Test + public void testAbort2() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); + SSTableWriter w = getWriter(cfs, s.descriptor.directory); + rewriter.switchWriter(w); + try (ICompactionScanner scanner = compacting.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) + { + while (scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + } + } + try + { + rewriter.finishAndThrow(true); + } + catch (Throwable t) + { + rewriter.abort(); + } + } + Thread.sleep(1000); + int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertEquals(filecount, 1); + assertEquals(1, cfs.getSSTables().size()); + validateCFS(cfs); + cfs.truncateBlocking(); + Thread.sleep(1000); + filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertEquals(0, filecount); + + } private SSTableReader writeFile(ColumnFamilyStore cfs, int count) { @@ -469,28 +568,45 @@ public class SSTableRewriterTest extends SchemaLoader private void validateCFS(ColumnFamilyStore cfs) { + Set<Integer> liveDescriptors = new HashSet<>(); for (SSTableReader sstable : cfs.getSSTables()) { assertFalse(sstable.isMarkedCompacted()); assertEquals(1, sstable.referenceCount()); + liveDescriptors.add(sstable.descriptor.generation); + } + for (File dir : cfs.directories.getCFDirectories()) + { + for (String f : dir.list()) + { + if (f.contains("Data")) + { + Descriptor d = Descriptor.fromFilename(f); + assertTrue(d.toString(), liveDescriptors.contains(d.generation)); + } + } } assertTrue(cfs.getDataTracker().getCompacting().isEmpty()); } - private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) + private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) { int tmplinkcount = 0; int tmpcount = 0; + int datacount = 0; for (String f : files) { if (f.contains("-tmplink-")) tmplinkcount++; - if (f.contains("-tmp-")) + else if (f.contains("-tmp-")) tmpcount++; + else if (f.contains("Data")) + datacount++; } assertEquals(expectedtmplinkCount, tmplinkcount); assertEquals(expectedtmpCount, tmpcount); + return datacount; } private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)