Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb4656f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb4656f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb4656f6

Branch: refs/heads/trunk
Commit: fb4656f6155113839ef8612aca578c3bdec96958
Parents: 4601abb b70f7ea
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jul 21 22:56:28 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 21 22:57:35 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 55 ++++++++++++++++--
 .../org/apache/cassandra/db/Directories.java    | 61 +++++++++++++++-----
 .../repair/RepairMessageVerbHandler.java        |  3 +-
 .../cassandra/service/CassandraDaemon.java      |  1 -
 .../cassandra/db/ColumnFamilyStoreTest.java     | 35 +++++++++++
 6 files changed, 133 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bd70d19,26ee348..e1d1fba
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,4 +1,13 @@@
 -2.1.9
 +2.2.1
 + * UDF / UDA execution time in trace (CASSANDRA-9723)
++ * Remove repair snapshot leftover on startup (CASSANDRA-7357)
 +
 +2.2.0
 + * Fix cqlsh copy methods and other windows specific issues (CASSANDRA-9795) 
 + * Don't wrap byte arrays in SequentialWriter (CASSANDRA-9797)
 + * sum() and avg() functions missing for smallint and tinyint types 
(CASSANDRA-9671)
 + * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
 +Merged from 2.1:
   * Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837)
   * Handle corrupt files on startup (CASSANDRA-9686)
   * Fix clientutil jar and tests (CASSANDRA-9760)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1166266,20e74dc..7d52a94
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -34,12 -35,7 +35,11 @@@ import com.google.common.base.Throwable
  import com.google.common.collect.*;
  import com.google.common.util.concurrent.*;
  
 +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.lifecycle.Tracker;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.io.FSWriteError;
- import org.apache.cassandra.utils.memory.MemtablePool;
  import org.json.simple.*;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -2253,12 -2252,14 +2256,15 @@@ public class ColumnFamilyStore implemen
  
      public void snapshotWithoutFlush(String snapshotName)
      {
-         snapshotWithoutFlush(snapshotName, null);
+         snapshotWithoutFlush(snapshotName, null, false);
      }
  
-     public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate)
+     /**
+      * @param ephemeral If this flag is set to true, the snapshot will be 
cleaned during next startup
+      */
 -    public void snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate, boolean ephemeral)
++    public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate, boolean ephemeral)
      {
 +        Set<SSTableReader> snapshottedSSTables = new HashSet<>();
          for (ColumnFamilyStore cfs : concatWithIndexes())
          {
              final JSONArray filesJSONArr = new JSONArray();
@@@ -2272,15 -2273,16 +2278,18 @@@
                      File snapshotDirectory = 
Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName);
                      ssTable.createLinks(snapshotDirectory.getPath()); // hard 
links
                      
filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA));
+ 
                      if (logger.isDebugEnabled())
                          logger.debug("Snapshot for {} keyspace data file {} 
created in {}", keyspace, ssTable.getFilename(), snapshotDirectory);
 +                    snapshottedSSTables.add(ssTable);
                  }
  
                  writeSnapshotManifest(filesJSONArr, snapshotName);
              }
          }
+         if (ephemeral)
+             createEphemeralSnapshotMarkerFile(snapshotName);
 +        return snapshottedSSTables;
      }
  
      private void writeSnapshotManifest(final JSONArray filesJSONArr, final 
String snapshotName)
@@@ -2348,15 -2378,18 +2387,19 @@@
       *
       * @param snapshotName the name of the associated with the snapshot
       */
 -    public void snapshot(String snapshotName)
 +    public Set<SSTableReader> snapshot(String snapshotName)
      {
-         return snapshot(snapshotName, null);
 -        snapshot(snapshotName, null, false);
++        return snapshot(snapshotName, null, false);
      }
  
-     public Set<SSTableReader> snapshot(String snapshotName, 
Predicate<SSTableReader> predicate)
++
+     /**
+      * @param ephemeral If this flag is set to true, the snapshot will be 
cleaned up during next startup
+      */
 -    public void snapshot(String snapshotName, Predicate<SSTableReader> 
predicate, boolean ephemeral)
++    public Set<SSTableReader> snapshot(String snapshotName, 
Predicate<SSTableReader> predicate, boolean ephemeral)
      {
          forceBlockingFlush();
-         return snapshotWithoutFlush(snapshotName, predicate);
 -        snapshotWithoutFlush(snapshotName, predicate, ephemeral);
++        return snapshotWithoutFlush(snapshotName, predicate, ephemeral);
      }
  
      public boolean snapshotExists(String snapshotName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 4982407,810c336..8b61c68
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -434,25 -373,23 +434,36 @@@ public class Directorie
  
      public File getSnapshotManifestFile(String snapshotName)
      {
 -         return new File(getDirectoryForNewSSTables(), join(SNAPSHOT_SUBDIR, 
snapshotName, "manifest.json"));
 +        File snapshotDir = getSnapshotDirectory(getDirectoryForNewSSTables(), 
snapshotName);
 +        return new File(snapshotDir, "manifest.json");
      }
  
+     public File getNewEphemeralSnapshotMarkerFile(String snapshotName)
+     {
+         File snapshotDir = new File(getWriteableLocationAsFile(1L), 
join(SNAPSHOT_SUBDIR, snapshotName));
+         return getEphemeralSnapshotMarkerFile(snapshotDir);
+     }
+ 
+     private static File getEphemeralSnapshotMarkerFile(File snapshotDirectory)
+     {
+         return new File(snapshotDirectory, "ephemeral.snapshot");
+     }
+ 
      public static File getBackupsDirectory(Descriptor desc)
      {
 -        return getOrCreate(desc.directory, BACKUPS_SUBDIR);
 +        return getBackupsDirectory(desc.directory);
 +    }
 +
 +    public static File getBackupsDirectory(File location)
 +    {
 +        if (location.getName().startsWith(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            return getOrCreate(location.getParentFile(), BACKUPS_SUBDIR, 
location.getName());
 +        }
 +        else
 +        {
 +            return getOrCreate(location, BACKUPS_SUBDIR);
 +        }
      }
  
      public SSTableLister sstableLister()
@@@ -638,29 -574,47 +649,49 @@@
      public Map<String, Pair<Long, Long>> getSnapshotDetails()
      {
          final Map<String, Pair<Long, Long>> snapshotSpaceMap = new 
HashMap<>();
-         for (File dir : dataPaths)
+         for (File snapshot : listSnapshots())
+         {
+             final long sizeOnDisk = FileUtils.folderSize(snapshot);
+             final long trueSize = getTrueAllocatedSizeIn(snapshot);
+             Pair<Long, Long> spaceUsed = 
snapshotSpaceMap.get(snapshot.getName());
+             if (spaceUsed == null)
+                 spaceUsed =  Pair.create(sizeOnDisk,trueSize);
+             else
+                 spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, 
spaceUsed.right + trueSize);
+             snapshotSpaceMap.put(snapshot.getName(), spaceUsed);
+         }
+         return snapshotSpaceMap;
+     }
+ 
+ 
+     public List<String> listEphemeralSnapshots()
+     {
+         final List<String> ephemeralSnapshots = new LinkedList<>();
+         for (File snapshot : listSnapshots())
+         {
+             if (getEphemeralSnapshotMarkerFile(snapshot).exists())
+                 ephemeralSnapshots.add(snapshot.getName());
+         }
+         return ephemeralSnapshots;
+     }
+ 
+     private List<File> listSnapshots()
+     {
+         final List<File> snapshots = new LinkedList<>();
+         for (final File dir : dataPaths)
          {
 -            final File snapshotDir = new File(dir,SNAPSHOT_SUBDIR);
 +            File snapshotDir = 
dir.getName().startsWith(SECONDARY_INDEX_NAME_SEPARATOR) ?
 +                                       new File(dir.getParent(), 
SNAPSHOT_SUBDIR) :
 +                                       new File(dir, SNAPSHOT_SUBDIR);
              if (snapshotDir.exists() && snapshotDir.isDirectory())
              {
-                 final File[] snapshots  = snapshotDir.listFiles();
-                 if (snapshots != null)
+                 final File[] snapshotDirs  = snapshotDir.listFiles();
+                 if (snapshotDirs != null)
                  {
-                     for (final File snapshot : snapshots)
+                     for (final File snapshot : snapshotDirs)
                      {
                          if (snapshot.isDirectory())
-                         {
-                             final long sizeOnDisk = 
FileUtils.folderSize(snapshot);
-                             final long trueSize = 
getTrueAllocatedSizeIn(snapshot);
-                             Pair<Long, Long> spaceUsed = 
snapshotSpaceMap.get(snapshot.getName());
-                             if (spaceUsed == null)
-                                 spaceUsed = Pair.create(sizeOnDisk, trueSize);
-                             else
-                                 spaceUsed = Pair.create(spaceUsed.left + 
sizeOnDisk, spaceUsed.right + trueSize);
-                             snapshotSpaceMap.put(snapshot.getName(), 
spaceUsed);
-                         }
+                             snapshots.add(snapshot);
                      }
                  }
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index bdc6c35,fd4ac28..c0855c4
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -88,14 -87,8 +88,15 @@@ public class RepairMessageVerbHandler i
                                      !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
                                      new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
                          }
-                     });
+                     }, true); //ephemeral snapshot, if repair fails, it will 
be cleaned next startup
+ 
 +                    Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
desc.parentSessionId);
 +                    if (!Sets.intersection(currentlyRepairing, 
snapshottedSSSTables).isEmpty())
 +                    {
 +                        logger.error("Cannot start multiple repair sessions 
over the same sstables");
 +                        throw new RuntimeException("Cannot start multiple 
repair sessions over the same sstables");
 +                    }
 +                    
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
                      logger.debug("Enqueuing response to snapshot request {} 
to {}", desc.sessionId, message.from);
                      MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                      break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 8388138,2c141a6..10aa4b2
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -52,9 -49,7 +52,8 @@@ import org.apache.cassandra.config.Data
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.CommitLog;
- import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.StartupException;
  import org.apache.cassandra.io.FSError;
  import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.util.FileUtils;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb4656f6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index c85b2e0,35814f0..b5e62b3
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -1527,6 -1441,48 +1527,41 @@@ public class ColumnFamilyStoreTes
          findRowGetSlicesAndAssertColsFound(cfs, 
multiRangeReverseWithCounting, "a", "colI", "colD", "colC");
      }
  
+     @Test
+     public void testClearEphemeralSnapshots() throws Throwable
+     {
++        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1);
++
++        //cleanup any previous test gargbage
++        cfs.clearSnapshot("");
++
+         Mutation rm;
 -        ColumnFamilyStore cfs = 
Keyspace.open("Keyspace3").getColumnFamilyStore("Indexed1");
+         for (int i = 0; i < 100; i++)
+         {
 -            rm = new Mutation("Keyspace3", ByteBufferUtil.bytes("key" + i));
 -            rm.add("Indexed1", cellname("birthdate"), 
ByteBufferUtil.bytes(34L), 0);
 -            rm.add("Indexed1", cellname("notbirthdate"), 
ByteBufferUtil.bytes((long) (i % 2)), 0);
++            rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("key" + i));
++            rm.add(CF_INDEX1, cellname("birthdate"), 
ByteBufferUtil.bytes(34L), 0);
++            rm.add(CF_INDEX1, cellname("notbirthdate"), 
ByteBufferUtil.bytes((long) (i % 2)), 0);
+             rm.applyUnsafe();
+         }
+ 
 -        //cleanup any previous test gargbage
 -        cfs.clearSnapshot("");
 -
 -        Cell[] cols = new Cell[5];
 -        for (int i = 0; i < 5; i++)
 -            cols[i] = column("c" + i, "value", 1);
 -
 -        putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], 
cols[3], cols[4]);
 -        putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]);
 -        putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], 
cols[3]);
 -
+         cfs.snapshot("nonEphemeralSnapshot", null, false);
+         cfs.snapshot("ephemeralSnapshot", null, true);
+ 
+         Map<String, Pair<Long, Long>> snapshotDetails = 
cfs.getSnapshotDetails();
+         assertEquals(2, snapshotDetails.size());
+         assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
+         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
+ 
+         ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories);
+ 
+         snapshotDetails = cfs.getSnapshotDetails();
+         assertEquals(1, snapshotDetails.size());
+         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
+ 
+         //test cleanup
+         cfs.clearSnapshot("");
+     }
+ 
      @SuppressWarnings("unchecked")
      @Test
      public void testMultiRangeSomeEmptyIndexed() throws Throwable

Reply via email to