Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d7ff7f3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d7ff7f3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d7ff7f3

Branch: refs/heads/cassandra-3.0
Commit: 7d7ff7f3cd317531a096e9f06c0bac9f5e139496
Parents: d719506 9654994
Author: Marcus Eriksson <marc...@apache.org>
Authored: Thu Feb 11 08:25:14 2016 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Feb 11 08:25:14 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  12 +++---------
 .../cassandra/db/compaction/Scrubber.java       |  19 ++++++++++---------
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../Keyspace1-Standard3-jb-1-Summary.db         | Bin 63 -> 75 bytes
 .../unit/org/apache/cassandra/db/ScrubTest.java |  14 +++++++-------
 6 files changed, 22 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d7ff7f3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 318672f,9f51291..7565386
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,9 +1,12 @@@
 -2.1.14
 +2.2.6
 + * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
 + * Fix paging on DISTINCT queries repeats result when first row in partition 
changes (CASSANDRA-10010)
 +Merged from 2.1:
+  * Properly release sstable ref when doing offline scrub (CASSANDRA-10697)
   * Improve nodetool status performance for large cluster (CASSANDRA-7238)
 - * Make it clear what DTCS timestamp_resolution is used for (CASSANDRA-11041)
   * Gossiper#isEnabled is not thread safe (CASSANDRA-11116)
   * Avoid major compaction mixing repaired and unrepaired sstables in DTCS 
(CASSANDRA-11113)
 + * Make it clear what DTCS timestamp_resolution is used for (CASSANDRA-11041)
   * test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938)
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d7ff7f3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c51ed7d,55b873a..8ca9852
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -327,15 -340,9 +327,9 @@@ public class CompactionManager implemen
          }
      }
  
 -    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final 
boolean skipCorrupted, final boolean checkData) throws InterruptedException, 
ExecutionException
 +    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final 
boolean skipCorrupted, final boolean checkData)
 +    throws InterruptedException, ExecutionException
      {
-         return performScrub(cfs, skipCorrupted, checkData, false);
-     }
- 
-     public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final 
boolean skipCorrupted, final boolean checkData, final boolean offline)
-     throws InterruptedException, ExecutionException
-     {
 -        assert !cfs.isIndex();
          return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
          {
              @Override
@@@ -345,30 -352,11 +339,30 @@@
              }
  
              @Override
 -            public void execute(SSTableReader input) throws IOException
 +            public void execute(LifecycleTransaction input) throws IOException
              {
-                 scrubOne(cfs, input, skipCorrupted, checkData, offline);
+                 scrubOne(cfs, input, skipCorrupted, checkData);
              }
 -        });
 +        }, OperationType.SCRUB);
 +    }
 +
 +    public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, 
final boolean extendedVerify) throws InterruptedException, ExecutionException
 +    {
 +        assert !cfs.isIndex();
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction input)
 +            {
 +                return input.originals();
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction input) throws IOException
 +            {
 +                verifyOne(cfs, input.onlyOne(), extendedVerify);
 +            }
 +        }, OperationType.VERIFY);
      }
  
      public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore 
cfs, final boolean excludeCurrentVersion) throws InterruptedException, 
ExecutionException
@@@ -705,14 -683,14 +699,14 @@@
          }
      }
  
-     private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction 
modifier, boolean skipCorrupted, boolean checkData, boolean offline) throws 
IOException
 -    private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, 
boolean skipCorrupted, boolean checkData) throws IOException
++    private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction 
modifier, boolean skipCorrupted, boolean checkData) throws IOException
      {
 -        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, 
checkData);
 +        CompactionInfo.Holder scrubInfo = null;
  
-         try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, 
offline, checkData))
 -        CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
 -        metrics.beginCompaction(scrubInfo);
 -        try
++        try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, 
checkData))
          {
 +            scrubInfo = scrubber.getScrubInfo();
 +            metrics.beginCompaction(scrubInfo);
              scrubber.scrub();
          }
          finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d7ff7f3/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 9fd8560,8bfd37b..e9137e2
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -54,10 -49,9 +54,8 @@@ public class Scrubber implements Closea
      private final RandomAccessReader dataFile;
      private final RandomAccessReader indexFile;
      private final ScrubInfo scrubInfo;
 -
 -    private final boolean isOffline;
 +    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
  
-     private final boolean isOffline;
- 
      private SSTableReader newSstable;
      private SSTableReader newInOrderSstable;
  
@@@ -81,21 -75,19 +79,20 @@@
      };
      private final SortedSet<Row> outOfOrderRows = new 
TreeSet<>(rowComparator);
  
-     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, 
boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
 -    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean 
skipCorrupted, boolean isOffline, boolean checkData) throws IOException
++    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, 
boolean skipCorrupted, boolean checkData) throws IOException
      {
-         this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), 
isOffline, checkData);
 -        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), 
isOffline, checkData);
++        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), 
checkData);
      }
  
 -    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean 
skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean 
checkData) throws IOException
 +    @SuppressWarnings("resource")
-     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, 
boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean 
checkData) throws IOException
++    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, 
boolean skipCorrupted, OutputHandler outputHandler, boolean checkData) throws 
IOException
      {
          this.cfs = cfs;
 -        this.sstable = sstable;
 +        this.transaction = transaction;
 +        this.sstable = transaction.onlyOne();
          this.outputHandler = outputHandler;
          this.skipCorrupted = skipCorrupted;
--        this.isOffline = isOffline;
 -        this.validateColumns = checkData;
 +        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
  
          List<SSTableReader> toScrub = Collections.singletonList(sstable);
  
@@@ -105,7 -97,7 +102,7 @@@
              throw new IOException("disk full");
  
          // If we run scrub offline, we should never purge tombstone, as we 
cannot know if other sstable have data that the tombstone deletes.
--        this.controller = isOffline
++        this.controller = transaction.isOffline()
                          ? new ScrubController(cfs)
                          : new CompactionController(cfs, 
Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs));
          this.isCommutative = cfs.metadata.isCounter();
@@@ -126,7 -117,7 +123,7 @@@
          // we'll also loop through the index at the same time, using the 
position from the index to recover if the
          // row header (key or data size) is corrupt. (This means our position 
in the index file will be one row
          // "ahead" of the data file.)
--        this.dataFile = isOffline
++        this.dataFile = transaction.isOffline()
                          ? sstable.openDataReader()
                          : 
sstable.openDataReader(CompactionManager.instance.getRateLimiter());
  
@@@ -143,7 -134,9 +140,7 @@@
      public void scrub()
      {
          outputHandler.output(String.format("Scrubbing %s (%s bytes)", 
sstable, dataFile.length()));
-         try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, 
sstable.maxDataAge, isOffline))
 -        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
 -        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, 
sstable.maxDataAge, isOffline);
 -        try
++        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, 
sstable.maxDataAge, transaction.isOffline()))
          {
              nextIndexKey = indexAvailable() ? 
ByteBufferUtil.readWithShortLength(indexFile) : null;
              if (indexAvailable())
@@@ -299,13 -294,14 +296,15 @@@
              {
                  // out of order rows, but no bad rows found - we can keep our 
repairedAt time
                  long repairedAt = badRows > 0 ? 
ActiveRepairService.UNREPAIRED_SSTABLE : 
sstable.getSSTableMetadata().repairedAt;
 -                SSTableWriter inOrderWriter = 
CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, 
repairedAt, sstable);
 -                for (Row row : outOfOrderRows)
 -                    inOrderWriter.append(row.key, row.cf);
 -                newInOrderSstable = 
inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
 -                if (!isOffline)
 -                    
cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
 -                else if (newInOrderSstable != null)
 +                try (SSTableWriter inOrderWriter = 
CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, 
repairedAt, sstable);)
 +                {
 +                    for (Row row : outOfOrderRows)
 +                        inOrderWriter.append(row.key, row.cf);
 +                    newInOrderSstable = inOrderWriter.finish(-1, 
sstable.maxDataAge, true);
 +                }
 +                transaction.update(newInOrderSstable, false);
++                if (transaction.isOffline() && newInOrderSstable != null)
+                     newInOrderSstable.selfRef().release();
                  outputHandler.warn(String.format("%d out of order rows found 
while scrubbing %s; Those have been written (in order) to a new sstable (%s)", 
outOfOrderRows.size(), sstable, newInOrderSstable));
              }
  
@@@ -321,6 -320,8 +320,8 @@@
          finally
          {
              controller.close();
 -            if (isOffline && newSstable != null)
++            if (transaction.isOffline() && newSstable != null)
+                 newSstable.selfRef().release();
          }
  
          if (newSstable == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d7ff7f3/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 3551b3d,fdf6c8d..a486a13
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@@ -119,10 -108,10 +119,10 @@@ public class StandaloneScrubbe
              {
                  for (SSTableReader sstable : sstables)
                  {
 -                    try
 +                    try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.SCRUB, sstable))
                      {
 -                        Scrubber scrubber = new Scrubber(cfs, sstable, 
options.skipCorrupted, handler, true, !options.noValidate);
 -                        try
 +                        txn.obsoleteOriginals(); // make sure originals are 
deleted and avoid NPE if index is missing, CASSANDRA-9591
-                         try (Scrubber scrubber = new Scrubber(cfs, txn, 
options.skipCorrupted, handler, true, !options.noValidate))
++                        try (Scrubber scrubber = new Scrubber(cfs, txn, 
options.skipCorrupted, handler, !options.noValidate))
                          {
                              scrubber.scrub();
                          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d7ff7f3/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index b69a1f8,167671b..c0cde40
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -166,21 -130,19 +166,21 @@@ public class ScrubTes
          overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), 
ByteBufferUtil.bytes("1"));
  
          // with skipCorrupted == false, the scrub is expected to fail
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true);
 -        try
 +        try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
-              Scrubber scrubber = new Scrubber(cfs, txn, false, false, true);)
++             Scrubber scrubber = new Scrubber(cfs, txn, false, true);)
          {
              scrubber.scrub();
              fail("Expected a CorruptSSTableException to be thrown");
          }
          catch (IOError err) {}
  
 -        // with skipCorrupted == true, the corrupt row will be skipped
 +        // with skipCorrupted == true, the corrupt rows will be skipped
          Scrubber.ScrubResult scrubResult;
 -        scrubber = new Scrubber(cfs, sstable, true, false, true);
 -        scrubResult = scrubber.scrubWithResult();
 -        scrubber.close();
 +        try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
-              Scrubber scrubber = new Scrubber(cfs, txn, true, false, true);)
++             Scrubber scrubber = new Scrubber(cfs, txn, true, true);)
 +        {
 +            scrubResult = scrubber.scrubWithResult();
 +        }
  
          assertNotNull(scrubResult);
  
@@@ -226,24 -188,20 +226,24 @@@
          overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), 
ByteBufferUtil.bytes("1"));
  
          // with skipCorrupted == false, the scrub is expected to fail
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true);
 -        try
 +        try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
-              Scrubber scrubber = new Scrubber(cfs, txn, false, false, true))
++             Scrubber scrubber = new Scrubber(cfs, txn, false, true))
          {
 +            // with skipCorrupted == true, the corrupt row will be skipped
              scrubber.scrub();
              fail("Expected a CorruptSSTableException to be thrown");
          }
          catch (IOError err) {}
  
 -        // with skipCorrupted == true, the corrupt row will be skipped
 -        scrubber = new Scrubber(cfs, sstable, true, false, true);
 -        scrubber.scrub();
 -        scrubber.close();
 -        assertEquals(1, cfs.getSSTables().size());
 +        try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
-              Scrubber scrubber = new Scrubber(cfs, txn, true, false, true))
++             Scrubber scrubber = new Scrubber(cfs, txn, true, true))
 +        {
 +            // with skipCorrupted == true, the corrupt row will be skipped
 +            scrubber.scrub();
 +            scrubber.close();
 +        }
  
 +        assertEquals(1, cfs.getSSTables().size());
          // verify that we can read all of the rows, and there is now one less 
row
          rows = cfs.getRangeSlice(Util.range("", ""), null, new 
IdentityQueryFilter(), 1000);
          assertEquals(1, rows.size());
@@@ -411,16 -353,11 +411,16 @@@
          components.add(Component.STATS);
          components.add(Component.SUMMARY);
          components.add(Component.TOC);
 -        SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, metadata);
  
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
 -        scrubber.scrub();
 +        SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, cfs);
 +        if (sstable.last.compareTo(sstable.first) < 0)
 +            sstable.last = sstable.first;
  
 +        try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.SCRUB, sstable);
-              Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
++             Scrubber scrubber = new Scrubber(cfs, txn, false, true);)
 +        {
 +            scrubber.scrub();
 +        }
          cfs.loadNewSSTables();
          List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new 
IdentityQueryFilter(), 1000);
          assert isRowOrdered(rows) : "Scrub failed: " + rows;
@@@ -451,13 -389,10 +451,13 @@@
          components.add(Component.STATS);
          components.add(Component.SUMMARY);
          components.add(Component.TOC);
 -        SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, metadata);
 +        SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, cfs);
  
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
 -        scrubber.scrub();
 +        try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.SCRUB, sstable);
-              Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
++             Scrubber scrubber = new Scrubber(cfs, txn, false, true);)
 +        {
 +            scrubber.scrub();
 +        }
  
          cfs.loadNewSSTables();
          assertEquals(7, countCells(cfs));

Reply via email to