Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65885e7f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65885e7f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65885e7f

Branch: refs/heads/trunk
Commit: 65885e7fc356c342331aec11667b5abdc28897b6
Parents: b55523e f166749
Author: Marcus Eriksson <marc...@apache.org>
Authored: Fri Dec 11 17:34:21 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Fri Dec 11 17:37:41 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  15 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 279 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 349 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  80 ++++-
 7 files changed, 462 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9c01160,5da0d42..5932dbb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,13 +1,37 @@@
 -2.2.5
 - * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
 - * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
 - * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 - * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
++3.0.2
+ Merged from 2.1:
+  * Allow cancellation of index summary redistribution (CASSANDRA-8805)
 - * Fix Stress profile parsing on Windows (CASSANDRA-10808)
 -
 -2.2.4
 +3.0.1
 + * Avoid MV race during node decommission (CASSANDRA-10674)
 + * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 + * Handle single-column deletions correction in materialized views
 +   when the column is part of the view primary key (CASSANDRA-10796)
 + * Fix issue with datadir migration on upgrade (CASSANDRA-10788)
 + * Fix bug with range tombstones on reverse queries and test coverage for
 +   AbstractBTreePartition (CASSANDRA-10059)
 + * Remove 64k limit on collection elements (CASSANDRA-10374)
 + * Remove unclear Indexer.indexes() method (CASSANDRA-10690)
 + * Fix NPE on stream read error (CASSANDRA-10771)
 + * Normalize cqlsh DESC output (CASSANDRA-10431)
 + * Rejects partition range deletions when columns are specified 
(CASSANDRA-10739)
 + * Fix error when saving cached key for old format sstable (CASSANDRA-10778)
 + * Invalidate prepared statements on DROP INDEX (CASSANDRA-10758)
 + * Fix SELECT statement with IN restrictions on partition key,
 +   ORDER BY and LIMIT (CASSANDRA-10729)
 + * Improve stress performance over 1k threads (CASSANDRA-7217)
 + * Wait for migration responses to complete before bootstrapping 
(CASSANDRA-10731)
 + * Unable to create a function with argument of type Inet (CASSANDRA-10741)
 + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
 + * Correctly preserve deletion info on updated rows when notifying indexers
 +   of single-row deletions (CASSANDRA-10694)
 + * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
 + * Keep the file open in trySkipCache (CASSANDRA-10669)
 + * Updated trigger example (CASSANDRA-10257)
 +Merged from 2.2:
 + * Fix regression on split size in CqlInputFormat (CASSANDRA-10835)
 + * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
 + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large 
buffers (CASSANDRA-10592)
   * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
   * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
   * Reject index queries while the index is building (CASSANDRA-8505)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ce7d2c,ba9c25e..bd950e3
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -50,11 -47,7 +50,12 @@@ import org.apache.cassandra.db.view.Vie
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
++import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
 +import org.apache.cassandra.io.sstable.SSTableRewriter;
 +import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@@ -1322,7 -1345,21 +1323,21 @@@ public class CompactionManager implemen
          return executor.submit(runnable);
      }
  
+     public List<SSTableReader> 
runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws 
IOException
+     {
+         metrics.beginCompaction(redistribution);
+ 
+         try
+         {
+             return redistribution.redistributeSummaries();
+         }
+         finally
+         {
+             metrics.finishCompaction(redistribution);
+         }
+     }
+ 
 -    public static int getDefaultGcBefore(ColumnFamilyStore cfs)
 +    public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec)
      {
          // 2ndary indexes have ExpiringColumns too, so we need to purge 
tombstones deleted before now. We do not need to
          // add any GcGrace however since 2ndary indexes are local to a node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java
index a69622b,6b66ded..20e6df2
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@@ -33,13 -33,9 +33,14 @@@ public enum OperationTyp
      UNKNOWN("Unknown compaction type"),
      ANTICOMPACTION("Anticompaction after repair"),
      VERIFY("Verify"),
 +    FLUSH("Flush"),
 +    STREAM("Stream"),
 +    WRITE("Write"),
-     VIEW_BUILD("View build");
++    VIEW_BUILD("View build"),
+     INDEX_SUMMARY("Index summary redistribution");
  
 -    private final String type;
 +    public final String type;
 +    public final String fileName;
  
      OperationType(String type)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index e07f297,4438dc1..aed35c9
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -26,10 -26,8 +26,7 @@@ import javax.management.MBeanServer
  import javax.management.ObjectName;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.ImmutableSet;
 -import com.google.common.collect.Sets;
 +import com.google.common.collect.*;
- 
- import org.apache.cassandra.db.lifecycle.SSTableSet;
- import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -40,6 -38,8 +37,9 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.View;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0000000,aad479b..b4eae31
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -1,0 -1,349 +1,349 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static 
org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+ 
+ public class IndexSummaryRedistribution extends CompactionInfo.Holder
+ {
+     private static final Logger logger = 
LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+ 
+     // The target (or ideal) number of index summary entries must differ from 
the actual number of
+     // entries by this ratio in order to trigger an upsample or downsample of 
the summary.  Because
+     // upsampling requires reading the primary index in order to rebuild the 
summary, the threshold
+     // for upsampling is is higher.
+     static final double UPSAMPLE_THRESHOLD = 1.5;
+     static final double DOWNSAMPLE_THESHOLD = 0.75;
+ 
+     private final List<SSTableReader> compacting;
+     private final Map<UUID, LifecycleTransaction> transactions;
+     private final long memoryPoolBytes;
+     private final UUID compactionId;
+     private volatile long remainingSpace;
+ 
+     public IndexSummaryRedistribution(List<SSTableReader> compacting, 
Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+     {
+         this.compacting = compacting;
+         this.transactions = transactions;
+         this.memoryPoolBytes = memoryPoolBytes;
+         this.compactionId = UUID.randomUUID();
+     }
+ 
+     public List<SSTableReader> redistributeSummaries() throws IOException
+     {
+         logger.info("Redistributing index summaries");
+         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+         List<SSTableReader> redistribute = new ArrayList<>();
+         for (LifecycleTransaction txn : transactions.values())
+         {
+             for (SSTableReader sstable : 
ImmutableList.copyOf(txn.originals()))
+             {
+                 // We can't change the sampling level of sstables with the 
old format, because the serialization format
+                 // doesn't include the sampling level.  Leave this one as it 
is.  (See CASSANDRA-8993 for details.)
+                 logger.trace("SSTable {} cannot be re-sampled due to old 
sstable format", sstable);
+                 if (!sstable.descriptor.version.hasSamplingLevel())
+                 {
+                     oldFormatSSTables.add(sstable);
+                     txn.cancel(sstable);
+                 }
+             }
+             redistribute.addAll(txn.originals());
+         }
+ 
+         long total = 0;
+         for (SSTableReader sstable : Iterables.concat(compacting, 
redistribute))
+             total += sstable.getIndexSummaryOffHeapSize();
+ 
+         logger.trace("Beginning redistribution of index summaries for {} 
sstables with memory pool size {} MB; current spaced used is {} MB",
+                      redistribute.size(), memoryPoolBytes / 1024L / 1024L, 
total / 1024.0 / 1024.0);
+ 
+         final Map<SSTableReader, Double> readRates = new 
HashMap<>(redistribute.size());
+         double totalReadsPerSec = 0.0;
+         for (SSTableReader sstable : redistribute)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             if (sstable.getReadMeter() != null)
+             {
+                 Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+                 totalReadsPerSec += readRate;
+                 readRates.put(sstable, readRate);
+             }
+         }
+         logger.trace("Total reads/sec across all sstables in index summary 
resize process: {}", totalReadsPerSec);
+ 
+         // copy and sort by read rates (ascending)
+         List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
+         Collections.sort(sstablesByHotness, new 
ReadRateComparator(readRates));
+ 
+         long remainingBytes = memoryPoolBytes;
+         for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables))
+             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+ 
+         logger.trace("Index summaries for compacting SSTables are using {} MB 
of space",
+                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+         List<SSTableReader> newSSTables = 
adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, 
remainingBytes);
+ 
+         for (LifecycleTransaction txn : transactions.values())
+             txn.finish();
+ 
+         total = 0;
+         for (SSTableReader sstable : Iterables.concat(compacting, 
oldFormatSSTables, newSSTables))
+             total += sstable.getIndexSummaryOffHeapSize();
+         logger.trace("Completed resizing of index summaries; current 
approximate memory used: {} MB",
+                      total / 1024.0 / 1024.0);
+ 
+         return newSSTables;
+     }
+ 
+     private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> 
sstables,
+                                                      Map<UUID, 
LifecycleTransaction> transactions,
+                                                      double totalReadsPerSec, 
long memoryPoolCapacity) throws IOException
+     {
+         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 
4);
+         List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+         List<ResampleEntry> forceResample = new ArrayList<>();
+         List<ResampleEntry> forceUpsample = new ArrayList<>();
+         List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+ 
+         // Going from the coldest to the hottest sstables, try to give each 
sstable an amount of space proportional
+         // to the number of total reads/sec it handles.
+         remainingSpace = memoryPoolCapacity;
+         for (SSTableReader sstable : sstables)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
 -            int minIndexInterval = sstable.metadata.getMinIndexInterval();
 -            int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
++            int minIndexInterval = sstable.metadata.params.minIndexInterval;
++            int maxIndexInterval = sstable.metadata.params.maxIndexInterval;
+ 
+             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : 
sstable.getReadMeter().fifteenMinuteRate();
+             long idealSpace = Math.round(remainingSpace * (readsPerSec / 
totalReadsPerSec));
+ 
+             // figure out how many entries our idealSpace would buy us, and 
pick a new sampling level based on that
+             int currentNumEntries = sstable.getIndexSummarySize();
+             double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / 
(double) currentNumEntries;
+             long targetNumEntries = Math.max(1, Math.round(idealSpace / 
avgEntrySize));
+             int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+             int maxSummarySize = sstable.getMaxIndexSummarySize();
+ 
+             // if the min_index_interval changed, calculate what our current 
sampling level would be under the new min
+             if (sstable.getMinIndexInterval() != minIndexInterval)
+             {
+                 int effectiveSamplingLevel = (int) 
Math.round(currentSamplingLevel * (minIndexInterval / (double) 
sstable.getMinIndexInterval()));
+                 maxSummarySize = (int) Math.round(maxSummarySize * 
(sstable.getMinIndexInterval() / (double) minIndexInterval));
+                 logger.trace("min_index_interval changed from {} to {}, so 
the current sampling level for {} is effectively now {} (was {})",
+                              sstable.getMinIndexInterval(), minIndexInterval, 
sstable, effectiveSamplingLevel, currentSamplingLevel);
+                 currentSamplingLevel = effectiveSamplingLevel;
+             }
+ 
+             int newSamplingLevel = 
IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, 
currentNumEntries, targetNumEntries,
+                     minIndexInterval, maxIndexInterval);
+             int numEntriesAtNewSamplingLevel = 
IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+             double effectiveIndexInterval = 
sstable.getEffectiveIndexInterval();
+ 
+             logger.trace("{} has {} reads/sec; ideal space for index summary: 
{} bytes ({} entries); considering moving " +
+                     "from level {} ({} entries, {} bytes) to level {} ({} 
entries, {} bytes)",
+                     sstable.getFilename(), readsPerSec, idealSpace, 
targetNumEntries, currentSamplingLevel, currentNumEntries,
+                     currentNumEntries * avgEntrySize, newSamplingLevel, 
numEntriesAtNewSamplingLevel,
+                     numEntriesAtNewSamplingLevel * avgEntrySize);
+ 
+             if (effectiveIndexInterval < minIndexInterval)
+             {
+                 // The min_index_interval was changed; re-sample to match it.
+                 logger.trace("Forcing resample of {} because the current 
index interval ({}) is below min_index_interval ({})",
+                         sstable, effectiveIndexInterval, minIndexInterval);
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 forceResample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else if (effectiveIndexInterval > maxIndexInterval)
+             {
+                 // The max_index_interval was lowered; force an upsample to 
the effective minimum sampling level
+                 logger.trace("Forcing upsample of {} because the current 
index interval ({}) is above max_index_interval ({})",
+                         sstable, effectiveIndexInterval, maxIndexInterval);
+                 newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * 
minIndexInterval) / maxIndexInterval);
+                 numEntriesAtNewSamplingLevel = 
IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, 
sstable.getMaxIndexSummarySize());
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 forceUpsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
+             else if (targetNumEntries >= currentNumEntries * 
UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 toUpsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
+             else if (targetNumEntries < currentNumEntries * 
DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * 
numEntriesAtNewSamplingLevel);
+                 toDownsample.add(new ResampleEntry(sstable, spaceUsed, 
newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else
+             {
+                 // keep the same sampling level
+                 logger.trace("SSTable {} is within thresholds of ideal 
sampling", sstable);
+                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                 newSSTables.add(sstable);
+                 transactions.get(sstable.metadata.cfId).cancel(sstable);
+             }
+             totalReadsPerSec -= readsPerSec;
+         }
+ 
+         if (remainingSpace > 0)
+         {
+             Pair<List<SSTableReader>, List<ResampleEntry>> result = 
distributeRemainingSpace(toDownsample, remainingSpace);
+             toDownsample = result.right;
+             newSSTables.addAll(result.left);
+             for (SSTableReader sstable : result.left)
+                 transactions.get(sstable.metadata.cfId).cancel(sstable);
+         }
+ 
+         // downsample first, then upsample
+         toDownsample.addAll(forceResample);
+         toDownsample.addAll(toUpsample);
+         toDownsample.addAll(forceUpsample);
+         for (ResampleEntry entry : toDownsample)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             SSTableReader sstable = entry.sstable;
+             logger.trace("Re-sampling index summary for {} from {}/{} to 
{}/{} of the original number of entries",
+                          sstable, sstable.getIndexSummarySamplingLevel(), 
Downsampling.BASE_SAMPLING_LEVEL,
+                          entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
+             ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
+             SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+             newSSTables.add(replacement);
+             transactions.get(sstable.metadata.cfId).update(replacement, true);
+         }
+ 
+         return newSSTables;
+     }
+ 
+     @VisibleForTesting
+     static Pair<List<SSTableReader>, List<ResampleEntry>> 
distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+     {
+         // sort by the amount of space regained by doing the downsample 
operation; we want to try to avoid operations
+         // that will make little difference.
+         Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+         {
+             public int compare(ResampleEntry o1, ResampleEntry o2)
+             {
+                 return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() 
- o1.newSpaceUsed,
+                                       o2.sstable.getIndexSummaryOffHeapSize() 
- o2.newSpaceUsed);
+             }
+         });
+ 
+         int noDownsampleCutoff = 0;
+         List<SSTableReader> willNotDownsample = new ArrayList<>();
+         while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+         {
+             ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+ 
+             long extraSpaceRequired = 
entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+             // see if we have enough leftover space to keep the current 
sampling level
+             if (extraSpaceRequired <= remainingSpace)
+             {
+                 logger.trace("Using leftover space to keep {} at the current 
sampling level ({})",
+                              entry.sstable, 
entry.sstable.getIndexSummarySamplingLevel());
+                 willNotDownsample.add(entry.sstable);
+                 remainingSpace -= extraSpaceRequired;
+             }
+             else
+             {
+                 break;
+             }
+ 
+             noDownsampleCutoff++;
+         }
+         return Pair.create(willNotDownsample, 
toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+     }
+ 
+     public CompactionInfo getCompactionInfo()
+     {
+         return new CompactionInfo(OperationType.INDEX_SUMMARY, 
(memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId);
+     }
+ 
+     /** Utility class for sorting sstables by their read rates. */
+     private static class ReadRateComparator implements 
Comparator<SSTableReader>
+     {
+         private final Map<SSTableReader, Double> readRates;
+ 
+         ReadRateComparator(Map<SSTableReader, Double> readRates)
+         {
+             this.readRates = readRates;
+         }
+ 
+         @Override
+         public int compare(SSTableReader o1, SSTableReader o2)
+         {
+             Double readRate1 = readRates.get(o1);
+             Double readRate2 = readRates.get(o2);
+             if (readRate1 == null && readRate2 == null)
+                 return 0;
+             else if (readRate1 == null)
+                 return -1;
+             else if (readRate2 == null)
+                 return 1;
+             else
+                 return Double.compare(readRate1, readRate2);
+         }
+     }
+ 
+     private static class ResampleEntry
+     {
+         public final SSTableReader sstable;
+         public final long newSpaceUsed;
+         public final int newSamplingLevel;
+ 
+         ResampleEntry(SSTableReader sstable, long newSpaceUsed, int 
newSamplingLevel)
+         {
+             this.sstable = sstable;
+             this.newSpaceUsed = newSpaceUsed;
+             this.newSamplingLevel = newSamplingLevel;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 5493edb,6935680..0498c68
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -19,15 -19,12 +19,20 @@@ package org.apache.cassandra.io.sstable
  
  import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.*;
 -import java.util.concurrent.*;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Comparator;
++import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
++import java.util.Set;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
  
+ import com.google.common.base.Joiner;
+ import com.google.common.collect.Sets;
  import org.junit.After;
  import org.junit.Before;
  import org.junit.BeforeClass;
@@@ -39,18 -36,20 +44,22 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.cache.CachingOptions;
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.compaction.OperationType;
 -import org.apache.cassandra.db.filter.QueryFilter;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 -import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.schema.CachingParams;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  
  import static com.google.common.collect.ImmutableMap.of;
  import static java.util.Arrays.asList;
@@@ -587,7 -586,68 +601,67 @@@ public class IndexSummaryManagerTes
          for (Map.Entry<String, Integer> entry : intervals.entrySet())
          {
              if (entry.getKey().contains(CF_STANDARDLOWiINTERVAL))
 -                assertTrue(entry.getValue() >= 
cfs.metadata.getMinIndexInterval());
 +                assertTrue(entry.getValue() >= 
cfs.metadata.params.minIndexInterval);
          }
      }
+ 
+     @Test
+     public void testCancelIndex() throws Exception
+     {
+         String ksname = KEYSPACE1;
+         String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no 
key caching
+         Keyspace keyspace = Keyspace.open(ksname);
+         final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         final int numSSTables = 4;
 -        final int numTries = 4;
+         int numRows = 256;
+         createSSTables(ksname, cfname, numSSTables, numRows);
+ 
 -        final List<SSTableReader> sstables = new 
ArrayList<>(cfs.getSSTables());
++        final List<SSTableReader> sstables = new 
ArrayList<>(cfs.getLiveSSTables());
+         for (SSTableReader sstable : sstables)
+             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+ 
+         final long singleSummaryOffHeapSpace = 
sstables.get(0).getIndexSummaryOffHeapSize();
+ 
+         // everything should get cut in half
+         final AtomicReference<CompactionInterruptedException> exception = new 
AtomicReference<>();
+ 
+         Thread t = new Thread(new Runnable()
+         {
+             public void run()
+             {
+                 try
+                 {
+                     // Don't leave enough space for even the minimal index 
summaries
+                     try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+                     {
+                         redistributeSummaries(Collections.EMPTY_LIST, 
of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+                     }
+                 }
+                 catch (CompactionInterruptedException ex)
+                 {
+                     exception.set(ex);
+                 }
+                 catch (IOException ignored)
+                 {
+                 }
+             }
+         });
+         t.start();
+         while (CompactionManager.instance.getActiveCompactions() == 0 && 
t.isAlive())
 -            Thread.yield();
++            Thread.sleep(1);
+         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+         t.join();
+ 
+         assertNotNull("Expected compaction interrupted exception", 
exception.get());
+         assertTrue("Expected no active compactions", 
CompactionMetrics.getCompactions().isEmpty());
+ 
+         Set<SSTableReader> beforeRedistributionSSTables = new 
HashSet<>(sstables);
 -        Set<SSTableReader> afterCancelSSTables = new 
HashSet<>(cfs.getSSTables());
++        Set<SSTableReader> afterCancelSSTables = new 
HashSet<>(cfs.getLiveSSTables());
+         Set<SSTableReader> disjoint = 
Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+         assertTrue(String.format("Mismatched files before and after 
cancelling redistribution: %s",
+                                  Joiner.on(",").join(disjoint)),
+                    disjoint.isEmpty());
+ 
+         validateData(cfs, numRows);
+     }
  }

Reply via email to