Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb4656f6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb4656f6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb4656f6 Branch: refs/heads/trunk Commit: fb4656f6155113839ef8612aca578c3bdec96958 Parents: 4601abb b70f7ea Author: Yuki Morishita <yu...@apache.org> Authored: Tue Jul 21 22:56:28 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Jul 21 22:57:35 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 55 ++++++++++++++++-- .../org/apache/cassandra/db/Directories.java | 61 +++++++++++++++----- .../repair/RepairMessageVerbHandler.java | 3 +- .../cassandra/service/CassandraDaemon.java | 1 - .../cassandra/db/ColumnFamilyStoreTest.java | 35 +++++++++++ 6 files changed, 133 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index bd70d19,26ee348..e1d1fba --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,12 -1,4 +1,13 @@@ -2.1.9 +2.2.1 + * UDF / UDA execution time in trace (CASSANDRA-9723) ++ * Remove repair snapshot leftover on startup (CASSANDRA-7357) + +2.2.0 + * Fix cqlsh copy methods and other windows specific issues (CASSANDRA-9795) + * Don't wrap byte arrays in SequentialWriter (CASSANDRA-9797) + * sum() and avg() functions missing for smallint and tinyint types (CASSANDRA-9671) + * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771) +Merged from 2.1: * Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837) * Handle corrupt files on startup (CASSANDRA-9686) * Fix clientutil jar and tests (CASSANDRA-9760) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 1166266,20e74dc..7d52a94 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -34,12 -35,7 +35,11 @@@ import com.google.common.base.Throwable import com.google.common.collect.*; import com.google.common.util.concurrent.*; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.lifecycle.Tracker; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.FSWriteError; - import org.apache.cassandra.utils.memory.MemtablePool; import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -2253,12 -2252,14 +2256,15 @@@ public class ColumnFamilyStore implemen public void snapshotWithoutFlush(String snapshotName) { - snapshotWithoutFlush(snapshotName, null); + snapshotWithoutFlush(snapshotName, null, false); } - public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate) + /** + * @param ephemeral If this flag is set to true, the snapshot will be cleaned during next startup + */ - public void snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) ++ public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) { + Set<SSTableReader> snapshottedSSTables = new HashSet<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { final JSONArray filesJSONArr = new JSONArray(); @@@ -2272,15 -2273,16 +2278,18 @@@ File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName); ssTable.createLinks(snapshotDirectory.getPath()); // hard links filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA)); + if (logger.isDebugEnabled()) logger.debug("Snapshot for {} keyspace data file {} created in {}", keyspace, ssTable.getFilename(), snapshotDirectory); + snapshottedSSTables.add(ssTable); } writeSnapshotManifest(filesJSONArr, snapshotName); } } + if (ephemeral) + createEphemeralSnapshotMarkerFile(snapshotName); + return snapshottedSSTables; } private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName) @@@ -2348,15 -2378,18 +2387,19 @@@ * * @param snapshotName the name of the associated with the snapshot */ - public void snapshot(String snapshotName) + public Set<SSTableReader> snapshot(String snapshotName) { - return snapshot(snapshotName, null); - snapshot(snapshotName, null, false); ++ return snapshot(snapshotName, null, false); } - public Set<SSTableReader> snapshot(String snapshotName, Predicate<SSTableReader> predicate) ++ + /** + * @param ephemeral If this flag is set to true, the snapshot will be cleaned up during next startup + */ - public void snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) ++ public Set<SSTableReader> snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) { forceBlockingFlush(); - return snapshotWithoutFlush(snapshotName, predicate); - snapshotWithoutFlush(snapshotName, predicate, ephemeral); ++ return snapshotWithoutFlush(snapshotName, predicate, ephemeral); } public boolean snapshotExists(String snapshotName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Directories.java index 4982407,810c336..8b61c68 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -434,25 -373,23 +434,36 @@@ public class Directorie public File getSnapshotManifestFile(String snapshotName) { - return new File(getDirectoryForNewSSTables(), join(SNAPSHOT_SUBDIR, snapshotName, "manifest.json")); + File snapshotDir = getSnapshotDirectory(getDirectoryForNewSSTables(), snapshotName); + return new File(snapshotDir, "manifest.json"); } + public File getNewEphemeralSnapshotMarkerFile(String snapshotName) + { + File snapshotDir = new File(getWriteableLocationAsFile(1L), join(SNAPSHOT_SUBDIR, snapshotName)); + return getEphemeralSnapshotMarkerFile(snapshotDir); + } + + private static File getEphemeralSnapshotMarkerFile(File snapshotDirectory) + { + return new File(snapshotDirectory, "ephemeral.snapshot"); + } + public static File getBackupsDirectory(Descriptor desc) { - return getOrCreate(desc.directory, BACKUPS_SUBDIR); + return getBackupsDirectory(desc.directory); + } + + public static File getBackupsDirectory(File location) + { + if (location.getName().startsWith(SECONDARY_INDEX_NAME_SEPARATOR)) + { + return getOrCreate(location.getParentFile(), BACKUPS_SUBDIR, location.getName()); + } + else + { + return getOrCreate(location, BACKUPS_SUBDIR); + } } public SSTableLister sstableLister() @@@ -638,29 -574,47 +649,49 @@@ public Map<String, Pair<Long, Long>> getSnapshotDetails() { final Map<String, Pair<Long, Long>> snapshotSpaceMap = new HashMap<>(); - for (File dir : dataPaths) + for (File snapshot : listSnapshots()) + { + final long sizeOnDisk = FileUtils.folderSize(snapshot); + final long trueSize = getTrueAllocatedSizeIn(snapshot); + Pair<Long, Long> spaceUsed = snapshotSpaceMap.get(snapshot.getName()); + if (spaceUsed == null) + spaceUsed = Pair.create(sizeOnDisk,trueSize); + else + spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, spaceUsed.right + trueSize); + snapshotSpaceMap.put(snapshot.getName(), spaceUsed); + } + return snapshotSpaceMap; + } + + + public List<String> listEphemeralSnapshots() + { + final List<String> ephemeralSnapshots = new LinkedList<>(); + for (File snapshot : listSnapshots()) + { + if (getEphemeralSnapshotMarkerFile(snapshot).exists()) + ephemeralSnapshots.add(snapshot.getName()); + } + return ephemeralSnapshots; + } + + private List<File> listSnapshots() + { + final List<File> snapshots = new LinkedList<>(); + for (final File dir : dataPaths) { - final File snapshotDir = new File(dir,SNAPSHOT_SUBDIR); + File snapshotDir = dir.getName().startsWith(SECONDARY_INDEX_NAME_SEPARATOR) ? + new File(dir.getParent(), SNAPSHOT_SUBDIR) : + new File(dir, SNAPSHOT_SUBDIR); if (snapshotDir.exists() && snapshotDir.isDirectory()) { - final File[] snapshots = snapshotDir.listFiles(); - if (snapshots != null) + final File[] snapshotDirs = snapshotDir.listFiles(); + if (snapshotDirs != null) { - for (final File snapshot : snapshots) + for (final File snapshot : snapshotDirs) { if (snapshot.isDirectory()) - { - final long sizeOnDisk = FileUtils.folderSize(snapshot); - final long trueSize = getTrueAllocatedSizeIn(snapshot); - Pair<Long, Long> spaceUsed = snapshotSpaceMap.get(snapshot.getName()); - if (spaceUsed == null) - spaceUsed = Pair.create(sizeOnDisk, trueSize); - else - spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, spaceUsed.right + trueSize); - snapshotSpaceMap.put(snapshot.getName(), spaceUsed); - } + snapshots.add(snapshot); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index bdc6c35,fd4ac28..c0855c4 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -88,14 -87,8 +88,15 @@@ public class RepairMessageVerbHandler i !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); } - }); + }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup + + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId); + if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty()) + { + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index 8388138,2c141a6..10aa4b2 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -52,9 -49,7 +52,8 @@@ import org.apache.cassandra.config.Data import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; - import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.util.FileUtils; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index c85b2e0,35814f0..b5e62b3 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@@ -1527,6 -1441,48 +1527,41 @@@ public class ColumnFamilyStoreTes findRowGetSlicesAndAssertColsFound(cfs, multiRangeReverseWithCounting, "a", "colI", "colD", "colC"); } + @Test + public void testClearEphemeralSnapshots() throws Throwable + { ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1); ++ ++ //cleanup any previous test gargbage ++ cfs.clearSnapshot(""); ++ + Mutation rm; - ColumnFamilyStore cfs = Keyspace.open("Keyspace3").getColumnFamilyStore("Indexed1"); + for (int i = 0; i < 100; i++) + { - rm = new Mutation("Keyspace3", ByteBufferUtil.bytes("key" + i)); - rm.add("Indexed1", cellname("birthdate"), ByteBufferUtil.bytes(34L), 0); - rm.add("Indexed1", cellname("notbirthdate"), ByteBufferUtil.bytes((long) (i % 2)), 0); ++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("key" + i)); ++ rm.add(CF_INDEX1, cellname("birthdate"), ByteBufferUtil.bytes(34L), 0); ++ rm.add(CF_INDEX1, cellname("notbirthdate"), ByteBufferUtil.bytes((long) (i % 2)), 0); + rm.applyUnsafe(); + } + - //cleanup any previous test gargbage - cfs.clearSnapshot(""); - - Cell[] cols = new Cell[5]; - for (int i = 0; i < 5; i++) - cols[i] = column("c" + i, "value", 1); - - putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3], cols[4]); - putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]); - putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]); - + cfs.snapshot("nonEphemeralSnapshot", null, false); + cfs.snapshot("ephemeralSnapshot", null, true); + + Map<String, Pair<Long, Long>> snapshotDetails = cfs.getSnapshotDetails(); + assertEquals(2, snapshotDetails.size()); + assertTrue(snapshotDetails.containsKey("ephemeralSnapshot")); + assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot")); + + ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories); + + snapshotDetails = cfs.getSnapshotDetails(); + assertEquals(1, snapshotDetails.size()); + assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot")); + + //test cleanup + cfs.clearSnapshot(""); + } + @SuppressWarnings("unchecked") @Test public void testMultiRangeSomeEmptyIndexed() throws Throwable