Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/compaction/CompactionManager.java
        src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
        test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/180130a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/180130a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/180130a8

Branch: refs/heads/cassandra-2.2
Commit: 180130a8e59b2848ad843d74c09fabfa5e82eab1
Parents: 7d06762 eaeabff
Author: Marcus Eriksson <marc...@apache.org>
Authored: Fri May 29 09:57:54 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Fri May 29 09:57:54 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  4 +-
 .../cassandra/io/sstable/SSTableRewriter.java   | 41 +++++-----
 .../unit/org/apache/cassandra/db/ScrubTest.java |  6 +-
 .../io/sstable/SSTableRewriterTest.java         | 82 ++++++++++++++------
 5 files changed, 88 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 618f063,0e759b7..262f8c8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
 -2.1.6
 +2.2
 + * Disallow frozen<> types in function arguments and return types for
 +   clarity (CASSANDRA-9411)
 + * Static Analysis to warn on unsafe use of Autocloseable instances 
(CASSANDRA-9431)
 + * Update commitlog archiving examples now that commitlog segments are
 +   not recycled (CASSANDRA-9350)
 + * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568)
 + * (cqlsh) Add support for native protocol 4 (CASSANDRA-9399)
 + * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
 + * Revert CASSANDRA-7807 (tracing completion client notifications) 
(CASSANDRA-9429)
 + * Add ability to stop compaction by ID (CASSANDRA-7207)
 + * Let CassandraVersion handle SNAPSHOT version (CASSANDRA-9438)
 +Merged from 2.1:
+  * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508)
   * (cqlsh) Better float precision by default (CASSANDRA-9224)
   * Improve estimated row count (CASSANDRA-9107)
   * Optimize range tombstone memory footprint (CASSANDRA-8603)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 26dab7c,2f3f7df..a2783da
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1187,90 -1060,68 +1187,90 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable 
was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        if (anticompactionGroup.originals().size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All 
sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            File destination = 
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
 OperationType.ANTICOMPACTION));
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, sstable.maxDataAge, false, false);
 -            SSTableRewriter unRepairedSSTableWriter = new 
SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
 +
 +        File destination = 
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
 OperationType.ANTICOMPACTION));
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
 +        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-         try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
anticompactionGroup, groupMaxDataAge, false);
-              SSTableRewriter unRepairedSSTableWriter = new 
SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
++        try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
anticompactionGroup, groupMaxDataAge, false, false);
++             SSTableRewriter unRepairedSSTableWriter = new 
SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
 +             AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(anticompactionGroup.originals());
 +             CompactionController controller = new CompactionController(cfs, 
sstableAsSet, getDefaultGcBefore(cfs)))
 +        {
 +            int expectedBloomFilterSize = 
Math.max(cfs.metadata.getMinIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
  
 -            try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(new 
HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new 
CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
 -            {
 -                int expectedBloomFilterSize = 
Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys());
 -                
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, repairedAt, sstable));
 -                
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstable));
 +            
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstableAsSet));
  
 -                CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +            CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, 
DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
 +            metrics.beginCompaction(ci);
 +            try
 +            {
 +                @SuppressWarnings("resource")
 +                CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
 +                while (iter.hasNext())
                  {
 -                    while (iter.hasNext())
 +                    @SuppressWarnings("resource")
 +                    AbstractCompactedRow row = iter.next();
 +                    // if current range from sstable is repaired, save it 
into the new repaired sstable
 +                    if (Range.isInRanges(row.key.getToken(), ranges))
                      {
 -                        AbstractCompactedRow row = iter.next();
 -                        // if current range from sstable is repaired, save it 
into the new repaired sstable
 -                        if (Range.isInRanges(row.key.getToken(), ranges))
 -                        {
 -                            repairedSSTableWriter.append(row);
 -                            repairedKeyCount++;
 -                        }
 -                        // otherwise save into the new 'non-repaired' table
 -                        else
 -                        {
 -                            unRepairedSSTableWriter.append(row);
 -                            unrepairedKeyCount++;
 -                        }
 +                        repairedSSTableWriter.append(row);
 +                        repairedKeyCount++;
 +                    }
 +                    // otherwise save into the new 'non-repaired' table
 +                    else
 +                    {
 +                        unRepairedSSTableWriter.append(row);
 +                        unrepairedKeyCount++;
                      }
                  }
 -                finally
 -                {
 -                    metrics.finishCompaction(ci);
 -                }
 -                
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
 -                
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
 -                
cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, 
anticompactedSSTables, OperationType.ANTICOMPACTION);
              }
 -            catch (Throwable e)
 +            finally
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
 +                metrics.finishCompaction(ci);
              }
 -        }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + 
unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, 
anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), 
anticompactedSSTables.size());
  
 -        return anticompactedSSTables;
 +            List<SSTableReader> anticompactedSSTables = new ArrayList<>();
 +            // since both writers are operating over the same Transaction, we 
cannot use the convenience Transactional.finish() method,
 +            // as on the second finish() we would prepareToCommit() on a 
Transaction that has already been committed, which is forbidden by the API
 +            // (since it indicates misuse). We call 
permitRedundantTransitions so that calls that transition to a state already 
occupied are permitted.
 +            anticompactionGroup.permitRedundantTransitions();
 +            repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
 +            unRepairedSSTableWriter.prepareToCommit();
 +            anticompactedSSTables.addAll(repairedSSTableWriter.finished());
 +            anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
 +            repairedSSTableWriter.commit();
 +            unRepairedSSTableWriter.commit();
 +
 +            logger.debug("Repaired {} keys out of {} for {}/{} in {}", 
repairedKeyCount,
 +                                                                       
repairedKeyCount + unrepairedKeyCount,
 +                                                                       
cfs.keyspace.getName(),
 +                                                                       
cfs.getColumnFamilyName(),
 +                                                                       
anticompactionGroup);
 +            return anticompactedSSTables.size();
 +        }
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8029075,824e58b..011c7d9
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -47,56 -49,68 +47,59 @@@ import org.apache.cassandra.utils.concu
   * but leave any hard-links in place for the readers we opened to cleanup 
when they're finished as we would had we finished
   * successfully.
   */
 -public class SSTableRewriter
 +public class SSTableRewriter extends Transactional.AbstractTransactional 
implements Transactional
  {
-     private static long preemptiveOpenInterval;
-     static
-     {
-         long interval = 
DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
-         if (interval < 0)
-             interval = Long.MAX_VALUE;
-         preemptiveOpenInterval = interval;
-     }
- 
-     @VisibleForTesting
-     public static void overrideOpenInterval(long size)
-     {
-         preemptiveOpenInterval = size;
-     }
 -    private final DataTracker dataTracker;
 +    @VisibleForTesting
-     public static long getOpenInterval()
-     {
-         return preemptiveOpenInterval;
-     }
++    public static boolean disableEarlyOpeningForTests = false;
 +
      private final ColumnFamilyStore cfs;
- 
+     private final long preemptiveOpenInterval;
      private final long maxAge;
 -    private final List<SSTableReader> finished = new ArrayList<>();
 -    private final Set<SSTableReader> rewriting; // the readers we are 
rewriting (updated as they are replaced)
 -    private final Map<Descriptor, DecoratedKey> originalStarts = new 
HashMap<>(); // the start key for each reader we are rewriting
 +    private long repairedAt = -1;
 +    // the set of final readers we will expose on commit
 +    private final LifecycleTransaction transaction; // the readers we are 
rewriting (updated as they are replaced)
 +    private final List<SSTableReader> preparedForCommit = new ArrayList<>();
      private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); 
// the file descriptors for each reader descriptor we are rewriting
  
 -    private SSTableReader currentlyOpenedEarly; // the reader for the most 
recent (re)opening of the target file
      private long currentlyOpenedEarlyAt; // the position (in MB) in the 
target file we last (re)opened at
  
 -    private final List<SSTableReader> finishedReaders = new ArrayList<>();
 -    private final Queue<Finished> finishedEarly = new ArrayDeque<>();
 -    // as writers are closed from finishedEarly, their last readers are moved
 -    // into discard, so that abort can cleanup after us safely
 -    private final List<SSTableReader> discard = new ArrayList<>();
 -    private final boolean isOffline; // true for operations that are 
performed without Cassandra running (prevents updates of DataTracker)
 +    private final List<SSTableWriter> writers = new ArrayList<>();
 +    private final boolean isOffline; // true for operations that are 
performed without Cassandra running (prevents updates of Tracker)
  
      private SSTableWriter writer;
      private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
 -    private State state = State.WORKING;
  
 -    private static enum State
 -    {
 -        WORKING, FINISHED, ABORTED
 -    }
 +    // for testing (TODO: remove when have byteman setup)
 +    private boolean throwEarly, throwLate;
  
 -    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> 
rewriting, long maxAge, boolean isOffline)
 +    public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction 
transaction, long maxAge, boolean isOffline)
      {
 -        this(cfs, rewriting, maxAge, isOffline, true);
++        this(cfs, transaction, maxAge, isOffline, true);
+     }
+ 
 -    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> 
rewriting, long maxAge, boolean isOffline, boolean shouldOpenEarly)
++    public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction 
transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+     {
 -        this(cfs, rewriting, maxAge, isOffline, 
calculateOpenInterval(shouldOpenEarly));
++        this(cfs, transaction, maxAge, isOffline, 
calculateOpenInterval(shouldOpenEarly));
+     }
+ 
+     @VisibleForTesting
 -    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> 
rewriting, long maxAge, boolean isOffline, long preemptiveOpenInterval)
++    public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction 
transaction, long maxAge, boolean isOffline, long preemptiveOpenInterval)
+     {
 -        this.rewriting = rewriting;
 -        for (SSTableReader sstable : rewriting)
 -        {
 -            originalStarts.put(sstable.descriptor, sstable.first);
 +        this.transaction = transaction;
 +        for (SSTableReader sstable : this.transaction.originals())
              fileDescriptors.put(sstable.descriptor, 
CLibrary.getfd(sstable.getFilename()));
 -        }
 -        this.dataTracker = cfs.getDataTracker();
          this.cfs = cfs;
          this.maxAge = maxAge;
          this.isOffline = isOffline;
+         this.preemptiveOpenInterval = preemptiveOpenInterval;
+     }
+ 
+     private static long calculateOpenInterval(boolean shouldOpenEarly)
+     {
+         long interval = 
DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
 -        if (!shouldOpenEarly || interval < 0)
++        if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0)
+             interval = Long.MAX_VALUE;
+         return interval;
      }
  
      public SSTableWriter currentWriter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index dbbce9e,028cf6c..d4579af
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -273,21 -234,6 +273,21 @@@ public class ScrubTes
      }
  
      @Test
 +    public void testScrubCorruptedCounterRowNoEarlyOpen() throws IOException, 
WriteTimeoutException
 +    {
-         long oldOpenVal = SSTableRewriter.getOpenInterval();
++        boolean oldDisabledVal = SSTableRewriter.disableEarlyOpeningForTests;
 +        try
 +        {
-             SSTableRewriter.overrideOpenInterval(Long.MAX_VALUE);
++            SSTableRewriter.disableEarlyOpeningForTests = true;
 +            testScrubCorruptedCounterRow();
 +        }
 +        finally
 +        {
-             SSTableRewriter.overrideOpenInterval(oldOpenVal);
++            SSTableRewriter.disableEarlyOpeningForTests = oldDisabledVal;
 +        }
 +    }
 +
 +    @Test
      public void testScrubDeletedRow() throws ExecutionException, 
InterruptedException
      {
          CompactionManager.instance.disableAutoCompaction();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 265bb6a,e1b001e..9e1cb91
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -130,10 -105,8 +131,10 @@@ public class SSTableRewriterTest extend
          cfs.addSSTable(s);
          Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
          assertEquals(1, sstables.size());
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, 
false, 10000000);
 -        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);)
++
 +        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);
 +             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN);
-              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000);)
          {
              ISSTableScanner scanner = scanners.scanners.get(0);
              CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
@@@ -164,11 -136,9 +165,11 @@@
          cfs.addSSTable(s);
          Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
          assertEquals(1, sstables.size());
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, 
false, 10000000);
++
          boolean checked = false;
 -        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);)
 +        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);
 +             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN);
-              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000))
          {
              ISSTableScanner scanner = scanners.scanners.get(0);
              CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
@@@ -261,19 -230,15 +262,18 @@@
  
          SSTableReader s = writeFile(cfs, 1000);
          cfs.addSSTable(s);
 -        long startStorageMetricsLoad = StorageMetrics.load.count();
 +        long startStorageMetricsLoad = StorageMetrics.load.getCount();
 +        long sBytesOnDisk = s.bytesOnDisk();
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 10000000);
 -        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
  
 +        List<SSTableReader> sstables;
          int files = 1;
          try (ISSTableScanner scanner = s.getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000))
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while(scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -315,16 -280,13 +315,15 @@@
          cfs.addSSTable(s);
  
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 10000000);
 -        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
  
 +        List<SSTableReader> sstables;
          int files = 1;
          try (ISSTableScanner scanner = s.getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000))
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while(scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -437,22 -407,24 +436,21 @@@
  
          DecoratedKey origFirst = s.first;
          DecoratedKey origLast = s.last;
 -        long startSize = cfs.metric.liveDiskSpaceUsed.count();
 +        long startSize = cfs.metric.liveDiskSpaceUsed.getCount();
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 10000000);
 -        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
  
          try (ISSTableScanner scanner = s.getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000);)
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              test.run(scanner, controller, s, cfs, rewriter);
          }
 -        catch (Throwable t)
 -        {
 -            rewriter.abort();
 -            throw t;
 -        }
  
 -        Thread.sleep(1000);
 -        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
 +        SSTableDeletingTask.waitForDeletions();
 +
 +        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
          assertEquals(1, cfs.getSSTables().size());
          assertFileCounts(s.descriptor.directory.list(), 0, 0);
          assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@@ -471,15 -443,13 +469,14 @@@
          cfs.addSSTable(s);
  
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 10000000);
 -        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
  
          int files = 1;
          try (ISSTableScanner scanner = s.getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000))
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while(scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -516,16 -491,13 +513,15 @@@
          SSTableReader s = writeFile(cfs, 1000);
          cfs.addSSTable(s);
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 10000000);
 -        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
  
 +        List<SSTableReader> sstables;
          int files = 1;
          try (ISSTableScanner scanner = s.getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000))
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while(scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -558,16 -535,13 +554,15 @@@
          SSTableReader s = writeFile(cfs, 400);
          cfs.addSSTable(s);
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(1000000);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 1000000);
 -        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
  
 +        List<SSTableReader> sstables;
          int files = 1;
          try (ISSTableScanner scanner = s.getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);)
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 1000000);)
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while(scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -648,15 -619,13 +643,14 @@@
          if (!offline)
              cfs.addSSTable(s);
          Set<SSTableReader> compacting = Sets.newHashSet(s);
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        cfs.getDataTracker().markCompacting(compacting);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
offline, 10000000);
 -        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
 -        rewriter.switchWriter(w);
          try (ISSTableScanner scanner = 
compacting.iterator().next().getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = offline ? 
LifecycleTransaction.offline(OperationType.UNKNOWN, compacting)
 +                                       : 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
offline);
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
offline, 10000000);
 +        )
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while (scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -729,16 -695,15 +723,15 @@@
          SSTableReader s = cfs.getSSTables().iterator().next();
          Set<SSTableReader> compacting = new HashSet<>();
          compacting.add(s);
 -        cfs.getDataTracker().markCompacting(compacting);
  
-         SSTableRewriter.overrideOpenInterval(1);
 -        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false, 1);
 -        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
 -        rewriter.switchWriter(w);
          int keyCount = 0;
          try (ISSTableScanner scanner = 
compacting.iterator().next().getScanner();
 -             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
 +             CompactionController controller = new CompactionController(cfs, 
compacting, 0);
 +             LifecycleTransaction txn = 
cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false);
++             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, 
false, 1);
 +        )
          {
 +            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
              while (scanner.hasNext())
              {
                  rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
@@@ -765,16 -738,14 +758,15 @@@
  
          SSTableReader s = writeFile(cfs, 1000);
          cfs.addSSTable(s);
 -        Set<SSTableReader> sstables = 
Sets.newHashSet(cfs.markAllCompacting());
 +        Set<SSTableReader> sstables = Sets.newHashSet(s);
          assertEquals(1, sstables.size());
-         SSTableRewriter.overrideOpenInterval(10000000);
 -        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, 
false, 10000000);
          boolean checked = false;
 -        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables))
 +        try (ISSTableScanner scanner = 
sstables.iterator().next().getScanner();
 +             CompactionController controller = new CompactionController(cfs, 
sstables, 0);
 +             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN);
-              SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false);
++             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false, 10000000);
 +        )
          {
 -            ISSTableScanner scanner = scanners.scanners.get(0);
 -            CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
              writer.switchWriter(getWriter(cfs, 
sstables.iterator().next().descriptor.directory));
              while (scanner.hasNext())
              {
@@@ -796,6 -770,55 +788,52 @@@
          validateCFS(cfs);
      }
  
+     /**
+      * emulates anticompaction - writing from one source sstable to two new 
sstables
+      *
+      * @throws IOException
+      */
+     @Test
+     public void testTwoWriters() throws IOException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
 -        Set<SSTableReader> sstables = 
Sets.newHashSet(cfs.markAllCompacting());
++        Set<SSTableReader> sstables = Sets.newHashSet(s);
+         assertEquals(1, sstables.size());
 -        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, 
false, false);
 -        SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, 
false, false);
 -        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables))
++        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);
++             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN);
++             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, 
false, false);
++             SSTableRewriter writer2 = new SSTableRewriter(cfs, txn, 1000, 
false, false))
+         {
+             ISSTableScanner scanner = scanners.scanners.get(0);
+             CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
+             writer.switchWriter(getWriter(cfs, 
sstables.iterator().next().descriptor.directory));
+             writer2.switchWriter(getWriter(cfs, 
sstables.iterator().next().descriptor.directory));
+             while (scanner.hasNext())
+             {
+                 AbstractCompactedRow row = new LazilyCompactedRow(controller, 
Collections.singletonList(scanner.next()));
+ 
+                 if (writer.currentWriter().getFilePointer() < 15000000)
+                     writer.append(row);
+                 else
+                     writer2.append(row);
+             }
+             for (int i = 0; i < 5000; i++)
+             {
+                 DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i));
+                 ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF);
+                 assertTrue(cf != null);
+             }
+         }
 -        writer.abort();
 -        writer2.abort();
 -        cfs.getDataTracker().unmarkCompacting(sstables);
 -        cfs.truncateBlocking();
 -        SSTableDeletingTask.waitForDeletions();
++        truncateCF();
+         validateCFS(cfs);
+     }
+ 
+ 
      private void validateKeys(Keyspace ks)
      {
          for (int i = 0; i < 100; i++)

Reply via email to