Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 e726cf6d6 -> b70f7ea0c refs/heads/cassandra-2.2 4601abb91 -> fb4656f61 refs/heads/trunk ece5cfc66 -> 9ae7126d3
Remove repair snapshot leftover on startup patch by Paulo Motta; reviewed by yukim for CASSANDRA-7357 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b70f7ea0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b70f7ea0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b70f7ea0 Branch: refs/heads/cassandra-2.1 Commit: b70f7ea0ce27b5defa0a7773d448732364e7aee0 Parents: e726cf6 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Tue Jul 21 22:11:37 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Jul 21 22:11:37 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 54 +++++++++++++++-- .../org/apache/cassandra/db/Directories.java | 63 +++++++++++++++----- .../repair/RepairMessageVerbHandler.java | 2 +- .../cassandra/service/CassandraDaemon.java | 1 - .../cassandra/db/ColumnFamilyStoreTest.java | 42 +++++++++++++ 6 files changed, 139 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c6774c2..26ee348 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ * Fix clientutil jar and tests (CASSANDRA-9760) * (cqlsh) Allow the SSL protocol version to be specified through the config file or environment variables (CASSANDRA-9544) + * Remove repair snapshot leftover on startup (CASSANDRA-7357) Merged from 2.0: * Complete CASSANDRA-8448 fix (CASSANDRA-9519) * Don't include auth credentials in debug log (CASSANDRA-9682) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index aec5f35..20e74dc 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.io.*; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -35,7 +36,6 @@ import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.utils.memory.MemtablePool; import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -501,6 +501,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { Directories directories = new Directories(metadata); + // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) + clearEphemeralSnapshots(directories); + // remove any left-behind SSTables from failed/stalled streaming FileFilter filter = new FileFilter() { @@ -2249,10 +2252,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void snapshotWithoutFlush(String snapshotName) { - snapshotWithoutFlush(snapshotName, null); + snapshotWithoutFlush(snapshotName, null, false); } - public void snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate) + /** + * @param ephemeral If this flag is set to true, the snapshot will be cleaned during next startup + */ + public void snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) { for (ColumnFamilyStore cfs : concatWithIndexes()) { @@ -2267,6 +2273,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName); ssTable.createLinks(snapshotDirectory.getPath()); // hard links filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA)); + if (logger.isDebugEnabled()) logger.debug("Snapshot for {} keyspace data file {} created in {}", keyspace, ssTable.getFilename(), snapshotDirectory); } @@ -2274,6 +2281,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean writeSnapshotManifest(filesJSONArr, snapshotName); } } + if (ephemeral) + createEphemeralSnapshotMarkerFile(snapshotName); } private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName) @@ -2296,6 +2305,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + private void createEphemeralSnapshotMarkerFile(final String snapshot) + { + final File ephemeralSnapshotMarker = directories.getNewEphemeralSnapshotMarkerFile(snapshot); + + try + { + if (!ephemeralSnapshotMarker.getParentFile().exists()) + ephemeralSnapshotMarker.getParentFile().mkdirs(); + + Files.createFile(ephemeralSnapshotMarker.toPath()); + logger.debug("Created ephemeral snapshot marker file on {}.", ephemeralSnapshotMarker.getAbsolutePath()); + } + catch (IOException e) + { + logger.warn(String.format("Could not create marker file %s for ephemeral snapshot %s. " + + "In case there is a failure in the operation that created " + + "this snapshot, you may need to clean it manually afterwards.", + ephemeralSnapshotMarker.getAbsolutePath(), snapshot), e); + } + } + + protected static void clearEphemeralSnapshots(Directories directories) + { + for (String ephemeralSnapshot : directories.listEphemeralSnapshots()) + { + logger.debug("Clearing ephemeral snapshot {} leftover from previous session.", ephemeralSnapshot); + Directories.clearSnapshot(ephemeralSnapshot, directories.getCFDirectories()); + } + } + public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException { Map<Integer, SSTableReader> active = new HashMap<>(); @@ -2341,13 +2380,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public void snapshot(String snapshotName) { - snapshot(snapshotName, null); + snapshot(snapshotName, null, false); } - public void snapshot(String snapshotName, Predicate<SSTableReader> predicate) + /** + * @param ephemeral If this flag is set to true, the snapshot will be cleaned up during next startup + */ + public void snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) { forceBlockingFlush(); - snapshotWithoutFlush(snapshotName, predicate); + snapshotWithoutFlush(snapshotName, predicate, ephemeral); } public boolean snapshotExists(String snapshotName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 2e0b60c..810c336 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -376,6 +376,17 @@ public class Directories return new File(getDirectoryForNewSSTables(), join(SNAPSHOT_SUBDIR, snapshotName, "manifest.json")); } + public File getNewEphemeralSnapshotMarkerFile(String snapshotName) + { + File snapshotDir = new File(getWriteableLocationAsFile(1L), join(SNAPSHOT_SUBDIR, snapshotName)); + return getEphemeralSnapshotMarkerFile(snapshotDir); + } + + private static File getEphemeralSnapshotMarkerFile(File snapshotDirectory) + { + return new File(snapshotDirectory, "ephemeral.snapshot"); + } + public static File getBackupsDirectory(Descriptor desc) { return getOrCreate(desc.directory, BACKUPS_SUBDIR); @@ -563,34 +574,55 @@ public class Directories public Map<String, Pair<Long, Long>> getSnapshotDetails() { final Map<String, Pair<Long, Long>> snapshotSpaceMap = new HashMap<>(); + for (File snapshot : listSnapshots()) + { + final long sizeOnDisk = FileUtils.folderSize(snapshot); + final long trueSize = getTrueAllocatedSizeIn(snapshot); + Pair<Long, Long> spaceUsed = snapshotSpaceMap.get(snapshot.getName()); + if (spaceUsed == null) + spaceUsed = Pair.create(sizeOnDisk,trueSize); + else + spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, spaceUsed.right + trueSize); + snapshotSpaceMap.put(snapshot.getName(), spaceUsed); + } + return snapshotSpaceMap; + } + + + public List<String> listEphemeralSnapshots() + { + final List<String> ephemeralSnapshots = new LinkedList<>(); + for (File snapshot : listSnapshots()) + { + if (getEphemeralSnapshotMarkerFile(snapshot).exists()) + ephemeralSnapshots.add(snapshot.getName()); + } + return ephemeralSnapshots; + } + + private List<File> listSnapshots() + { + final List<File> snapshots = new LinkedList<>(); for (final File dir : dataPaths) { final File snapshotDir = new File(dir,SNAPSHOT_SUBDIR); if (snapshotDir.exists() && snapshotDir.isDirectory()) { - final File[] snapshots = snapshotDir.listFiles(); - if (snapshots != null) + final File[] snapshotDirs = snapshotDir.listFiles(); + if (snapshotDirs != null) { - for (final File snapshot : snapshots) + for (final File snapshot : snapshotDirs) { if (snapshot.isDirectory()) - { - final long sizeOnDisk = FileUtils.folderSize(snapshot); - final long trueSize = getTrueAllocatedSizeIn(snapshot); - Pair<Long,Long> spaceUsed = snapshotSpaceMap.get(snapshot.getName()); - if (spaceUsed == null) - spaceUsed = Pair.create(sizeOnDisk,trueSize); - else - spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, spaceUsed.right + trueSize); - snapshotSpaceMap.put(snapshot.getName(), spaceUsed); - } + snapshots.add(snapshot); } } } } - return snapshotSpaceMap; + return snapshots; } + public boolean snapshotExists(String snapshotName) { for (File dir : dataPaths) @@ -611,8 +643,7 @@ public class Directories File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, tag)); if (snapshotDir.exists()) { - if (logger.isDebugEnabled()) - logger.debug("Removing snapshot directory {}", snapshotDir); + logger.debug("Removing snapshot directory {}", snapshotDir); FileUtils.deleteRecursive(snapshotDir); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 872978e..fd4ac28 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -87,7 +87,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); } - }); + }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 949ea4c..2c141a6 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -49,7 +49,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 5faab78..35814f0 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -1441,6 +1441,48 @@ public class ColumnFamilyStoreTest extends SchemaLoader findRowGetSlicesAndAssertColsFound(cfs, multiRangeReverseWithCounting, "a", "colI", "colD", "colC"); } + @Test + public void testClearEphemeralSnapshots() throws Throwable + { + Mutation rm; + ColumnFamilyStore cfs = Keyspace.open("Keyspace3").getColumnFamilyStore("Indexed1"); + for (int i = 0; i < 100; i++) + { + rm = new Mutation("Keyspace3", ByteBufferUtil.bytes("key" + i)); + rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(34L), 0); + rm.add("Indexed1", cellname("notbirthdate"), ByteBufferUtil.bytes((long) (i % 2)), 0); + rm.applyUnsafe(); + } + + //cleanup any previous test gargbage + cfs.clearSnapshot(""); + + Cell[] cols = new Cell[5]; + for (int i = 0; i < 5; i++) + cols[i] = column("c" + i, "value", 1); + + putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3], cols[4]); + putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]); + putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]); + + cfs.snapshot("nonEphemeralSnapshot", null, false); + cfs.snapshot("ephemeralSnapshot", null, true); + + Map<String, Pair<Long, Long>> snapshotDetails = cfs.getSnapshotDetails(); + assertEquals(2, snapshotDetails.size()); + assertTrue(snapshotDetails.containsKey("ephemeralSnapshot")); + assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot")); + + ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories); + + snapshotDetails = cfs.getSnapshotDetails(); + assertEquals(1, snapshotDetails.size()); + assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot")); + + //test cleanup + cfs.clearSnapshot(""); + } + @SuppressWarnings("unchecked") @Test public void testMultiRangeSomeEmptyIndexed() throws Throwable