Repository: cassandra Updated Branches: refs/heads/trunk 2cd18ef5a -> 3f79c5baa
Allow cancellation of index summary redistribution Patch by Carl Yeksigian; reviewed by marcuse for CASSANDRA-8805 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc7075a4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc7075a4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc7075a4 Branch: refs/heads/trunk Commit: fc7075a41837301f3866333e0eb5c464715d888c Parents: 95dab27 Author: Carl Yeksigian <c...@apache.org> Authored: Tue Dec 8 12:22:25 2015 -0500 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri Dec 11 17:20:18 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionInfo.java | 14 +- .../db/compaction/CompactionManager.java | 14 + .../cassandra/db/compaction/OperationType.java | 3 +- .../io/sstable/IndexSummaryManager.java | 265 +-------------- .../io/sstable/IndexSummaryRedistribution.java | 338 +++++++++++++++++++ .../io/sstable/IndexSummaryManagerTest.java | 69 +++- 7 files changed, 435 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 46cda65..2ee8b07 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.13 + * Allow cancellation of index summary redistribution (CASSANDRA-8805) * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) * Fix Stress profile parsing on Windows (CASSANDRA-10808) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index d086eef..e88143e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -98,9 +98,17 @@ public final class CompactionInfo implements Serializable public String toString() { StringBuilder buff = new StringBuilder(); - buff.append(getTaskType()).append('@').append(getId()); - buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily()); - buff.append(", ").append(getCompleted()).append('/').append(getTotal()); + buff.append(getTaskType()); + if (cfm != null) + { + buff.append('@').append(getId()).append('('); + buff.append(getKeyspace()).append(", ").append(getColumnFamily()).append(", "); + } + else + { + buff.append('('); + } + buff.append(getCompleted()).append('/').append(getTotal()); return buff.append(')').append(unit).toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2630ba2..9bddaf5 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1222,6 +1222,20 @@ public class CompactionManager implements CompactionManagerMBean return executor.submit(runnable); } + public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException + { + metrics.beginCompaction(redistribution); + + try + { + return redistribution.redistributeSummaries(); + } + finally + { + metrics.finishCompaction(redistribution); + } + } + static int getDefaultGcBefore(ColumnFamilyStore cfs) { // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index 15d18f6..475b591 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -31,7 +31,8 @@ public enum OperationType /** Compaction for tombstone removal */ TOMBSTONE_COMPACTION("Tombstone Compaction"), UNKNOWN("Unknown compaction type"), - ANTICOMPACTION("Anticompaction after repair"); + ANTICOMPACTION("Anticompaction after repair"), + INDEX_SUMMARY("Index summary redistribution"); private final String type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 0c196ff..be5cc3c 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -20,8 +20,6 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,7 +31,6 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -45,11 +42,10 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; -import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; - /** * Manages the fixed-size memory pool for index summaries, periodically resizing them * in order to give more memory to hot sstables and less memory to cold sstables. @@ -255,261 +251,6 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean @VisibleForTesting public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException { - long total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting)) - total += sstable.getIndexSummaryOffHeapSize(); - - List<SSTableReader> oldFormatSSTables = new ArrayList<>(); - for (SSTableReader sstable : nonCompacting) - { - // We can't change the sampling level of sstables with the old format, because the serialization format - // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) - logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); - if (!sstable.descriptor.version.hasSamplingLevel) - oldFormatSSTables.add(sstable); - } - nonCompacting.removeAll(oldFormatSSTables); - - logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", - nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); - - final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size()); - double totalReadsPerSec = 0.0; - for (SSTableReader sstable : nonCompacting) - { - if (sstable.getReadMeter() != null) - { - Double readRate = sstable.getReadMeter().fifteenMinuteRate(); - totalReadsPerSec += readRate; - readRates.put(sstable, readRate); - } - } - logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec); - - // copy and sort by read rates (ascending) - List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting); - Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); - - long remainingBytes = memoryPoolBytes; - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) - remainingBytes -= sstable.getIndexSummaryOffHeapSize(); - - logger.trace("Index summaries for compacting SSTables are using {} MB of space", - (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); - List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes); - - total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) - total += sstable.getIndexSummaryOffHeapSize(); - logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB", - total / 1024.0 / 1024.0); - - return newSSTables; - } - - private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, - double totalReadsPerSec, long memoryPoolCapacity) throws IOException - { - - List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4); - List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4); - List<ResampleEntry> forceResample = new ArrayList<>(); - List<ResampleEntry> forceUpsample = new ArrayList<>(); - List<SSTableReader> newSSTables = new ArrayList<>(sstables.size()); - - // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional - // to the number of total reads/sec it handles. - long remainingSpace = memoryPoolCapacity; - for (SSTableReader sstable : sstables) - { - int minIndexInterval = sstable.metadata.getMinIndexInterval(); - int maxIndexInterval = sstable.metadata.getMaxIndexInterval(); - - double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate(); - long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec)); - - // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that - int currentNumEntries = sstable.getIndexSummarySize(); - double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries; - long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize)); - int currentSamplingLevel = sstable.getIndexSummarySamplingLevel(); - int maxSummarySize = sstable.getMaxIndexSummarySize(); - - // if the min_index_interval changed, calculate what our current sampling level would be under the new min - if (sstable.getMinIndexInterval() != minIndexInterval) - { - int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval())); - maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval)); - logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})", - sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel); - currentSamplingLevel = effectiveSamplingLevel; - } - - int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries, - minIndexInterval, maxIndexInterval); - int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize); - double effectiveIndexInterval = sstable.getEffectiveIndexInterval(); - - logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " + - "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)", - sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, - currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel, - numEntriesAtNewSamplingLevel * avgEntrySize); - - if (effectiveIndexInterval < minIndexInterval) - { - // The min_index_interval was changed; re-sample to match it. - logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})", - sstable, effectiveIndexInterval, minIndexInterval); - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= spaceUsed; - } - else if (effectiveIndexInterval > maxIndexInterval) - { - // The max_index_interval was lowered; force an upsample to the effective minimum sampling level - logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})", - sstable, effectiveIndexInterval, maxIndexInterval); - newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval); - numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize()); - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; - } - else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel) - { - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; - } - else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel) - { - long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); - toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); - remainingSpace -= spaceUsed; - } - else - { - // keep the same sampling level - logger.trace("SSTable {} is within thresholds of ideal sampling", sstable); - remainingSpace -= sstable.getIndexSummaryOffHeapSize(); - newSSTables.add(sstable); - } - totalReadsPerSec -= readsPerSec; - } - - if (remainingSpace > 0) - { - Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace); - toDownsample = result.right; - newSSTables.addAll(result.left); - } - - // downsample first, then upsample - toDownsample.addAll(forceResample); - toDownsample.addAll(toUpsample); - toDownsample.addAll(forceUpsample); - Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create(); - Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create(); - for (ResampleEntry entry : toDownsample) - { - SSTableReader sstable = entry.sstable; - logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", - sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, - entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); - ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()); - SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); - DataTracker tracker = cfs.getDataTracker(); - - replacedByTracker.put(tracker, sstable); - replacementsByTracker.put(tracker, replacement); - } - - for (DataTracker tracker : replacedByTracker.keySet()) - { - tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker)); - newSSTables.addAll(replacementsByTracker.get(tracker)); - } - - return newSSTables; - } - - @VisibleForTesting - static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) - { - // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations - // that will make little difference. - Collections.sort(toDownsample, new Comparator<ResampleEntry>() - { - public int compare(ResampleEntry o1, ResampleEntry o2) - { - return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed, - o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed); - } - }); - - int noDownsampleCutoff = 0; - List<SSTableReader> willNotDownsample = new ArrayList<>(); - while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size()) - { - ResampleEntry entry = toDownsample.get(noDownsampleCutoff); - - long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed; - // see if we have enough leftover space to keep the current sampling level - if (extraSpaceRequired <= remainingSpace) - { - logger.trace("Using leftover space to keep {} at the current sampling level ({})", - entry.sstable, entry.sstable.getIndexSummarySamplingLevel()); - willNotDownsample.add(entry.sstable); - remainingSpace -= extraSpaceRequired; - } - else - { - break; - } - - noDownsampleCutoff++; - } - return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size())); - } - - private static class ResampleEntry - { - public final SSTableReader sstable; - public final long newSpaceUsed; - public final int newSamplingLevel; - - public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel) - { - this.sstable = sstable; - this.newSpaceUsed = newSpaceUsed; - this.newSamplingLevel = newSamplingLevel; - } - } - - /** Utility class for sorting sstables by their read rates. */ - private static class ReadRateComparator implements Comparator<SSTableReader> - { - private final Map<SSTableReader, Double> readRates; - - public ReadRateComparator(Map<SSTableReader, Double> readRates) - { - this.readRates = readRates; - } - - @Override - public int compare(SSTableReader o1, SSTableReader o2) - { - Double readRate1 = readRates.get(o1); - Double readRate2 = readRates.get(o2); - if (readRate1 == null && readRate2 == null) - return 0; - else if (readRate1 == null) - return -1; - else if (readRate2 == null) - return 1; - else - return Double.compare(readRate1, readRate2); - } + return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java new file mode 100644 index 0000000..adb3e4e --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataTracker; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; + +public class IndexSummaryRedistribution extends CompactionInfo.Holder +{ + private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class); + + private final List<SSTableReader> compacting; + private final List<SSTableReader> nonCompacting; + private final long memoryPoolBytes; + private volatile long remainingSpace; + + public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) + { + this.compacting = compacting; + this.nonCompacting = nonCompacting; + this.memoryPoolBytes = memoryPoolBytes; + } + + public List<SSTableReader> redistributeSummaries() throws IOException + { + long total = 0; + for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting)) + total += sstable.getIndexSummaryOffHeapSize(); + + List<SSTableReader> oldFormatSSTables = new ArrayList<>(); + for (SSTableReader sstable : nonCompacting) + { + // We can't change the sampling level of sstables with the old format, because the serialization format + // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) + logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); + if (!sstable.descriptor.version.hasSamplingLevel) + oldFormatSSTables.add(sstable); + } + nonCompacting.removeAll(oldFormatSSTables); + + logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", + nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); + + final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size()); + double totalReadsPerSec = 0.0; + for (SSTableReader sstable : nonCompacting) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + if (sstable.getReadMeter() != null) + { + Double readRate = sstable.getReadMeter().fifteenMinuteRate(); + totalReadsPerSec += readRate; + readRates.put(sstable, readRate); + } + } + logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec); + + // copy and sort by read rates (ascending) + List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting); + Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); + + long remainingBytes = memoryPoolBytes; + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) + remainingBytes -= sstable.getIndexSummaryOffHeapSize(); + + logger.trace("Index summaries for compacting SSTables are using {} MB of space", + (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0); + List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes); + + total = 0; + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) + total += sstable.getIndexSummaryOffHeapSize(); + logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB", + total / 1024.0 / 1024.0); + + return newSSTables; + } + + private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, + double totalReadsPerSec, long memoryPoolCapacity) throws IOException + { + + List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4); + List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4); + List<ResampleEntry> forceResample = new ArrayList<>(); + List<ResampleEntry> forceUpsample = new ArrayList<>(); + List<SSTableReader> newSSTables = new ArrayList<>(sstables.size()); + + // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional + // to the number of total reads/sec it handles. + remainingSpace = memoryPoolCapacity; + for (SSTableReader sstable : sstables) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + int minIndexInterval = sstable.metadata.getMinIndexInterval(); + int maxIndexInterval = sstable.metadata.getMaxIndexInterval(); + + double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate(); + long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec)); + + // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that + int currentNumEntries = sstable.getIndexSummarySize(); + double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries; + long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize)); + int currentSamplingLevel = sstable.getIndexSummarySamplingLevel(); + int maxSummarySize = sstable.getMaxIndexSummarySize(); + + // if the min_index_interval changed, calculate what our current sampling level would be under the new min + if (sstable.getMinIndexInterval() != minIndexInterval) + { + int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval())); + maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval)); + logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})", + sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel); + currentSamplingLevel = effectiveSamplingLevel; + } + + int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries, + minIndexInterval, maxIndexInterval); + int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize); + double effectiveIndexInterval = sstable.getEffectiveIndexInterval(); + + logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " + + "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)", + sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, + currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel, + numEntriesAtNewSamplingLevel * avgEntrySize); + + if (effectiveIndexInterval < minIndexInterval) + { + // The min_index_interval was changed; re-sample to match it. + logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})", + sstable, effectiveIndexInterval, minIndexInterval); + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= spaceUsed; + } + else if (effectiveIndexInterval > maxIndexInterval) + { + // The max_index_interval was lowered; force an upsample to the effective minimum sampling level + logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})", + sstable, effectiveIndexInterval, maxIndexInterval); + newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval); + numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize()); + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; + } + else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel) + { + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel; + } + else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel) + { + long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel); + toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel)); + remainingSpace -= spaceUsed; + } + else + { + // keep the same sampling level + logger.trace("SSTable {} is within thresholds of ideal sampling", sstable); + remainingSpace -= sstable.getIndexSummaryOffHeapSize(); + newSSTables.add(sstable); + } + totalReadsPerSec -= readsPerSec; + } + + if (remainingSpace > 0) + { + Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace); + toDownsample = result.right; + newSSTables.addAll(result.left); + } + + // downsample first, then upsample + toDownsample.addAll(forceResample); + toDownsample.addAll(toUpsample); + toDownsample.addAll(forceUpsample); + Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create(); + Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create(); + + try + { + for (ResampleEntry entry : toDownsample) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + SSTableReader sstable = entry.sstable; + logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", + sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, + entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); + ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()); + DataTracker tracker = cfs.getDataTracker(); + SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); + newSSTables.add(replacement); + replacedByTracker.put(tracker, sstable); + replacementsByTracker.put(tracker, replacement); + } + } + finally + { + for (DataTracker tracker : replacedByTracker.keySet()) + tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker)); + } + + return newSSTables; + } + + @VisibleForTesting + static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) + { + // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations + // that will make little difference. + Collections.sort(toDownsample, new Comparator<ResampleEntry>() + { + public int compare(ResampleEntry o1, ResampleEntry o2) + { + return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed, + o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed); + } + }); + + int noDownsampleCutoff = 0; + List<SSTableReader> willNotDownsample = new ArrayList<>(); + while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size()) + { + ResampleEntry entry = toDownsample.get(noDownsampleCutoff); + + long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed; + // see if we have enough leftover space to keep the current sampling level + if (extraSpaceRequired <= remainingSpace) + { + logger.trace("Using leftover space to keep {} at the current sampling level ({})", + entry.sstable, entry.sstable.getIndexSummarySamplingLevel()); + willNotDownsample.add(entry.sstable); + remainingSpace -= extraSpaceRequired; + } + else + { + break; + } + + noDownsampleCutoff++; + } + return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size())); + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes"); + } + + /** Utility class for sorting sstables by their read rates. */ + private static class ReadRateComparator implements Comparator<SSTableReader> + { + private final Map<SSTableReader, Double> readRates; + + ReadRateComparator(Map<SSTableReader, Double> readRates) + { + this.readRates = readRates; + } + + @Override + public int compare(SSTableReader o1, SSTableReader o2) + { + Double readRate1 = readRates.get(o1); + Double readRate2 = readRates.get(o2); + if (readRate1 == null && readRate2 == null) + return 0; + else if (readRate1 == null) + return -1; + else if (readRate2 == null) + return 1; + else + return Double.compare(readRate1, readRate2); + } + } + + private static class ResampleEntry + { + public final SSTableReader sstable; + public final long newSpaceUsed; + public final int newSamplingLevel; + + ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel) + { + this.sstable = sstable; + this.newSpaceUsed = newSpaceUsed; + this.newSamplingLevel = newSamplingLevel; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 64d3354..63928e2 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -22,12 +22,14 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +38,12 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.metrics.RestorableMeter; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD; @@ -75,6 +78,11 @@ public class IndexSummaryManagerTest extends SchemaLoader @After public void afterTest() { + for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions()) + { + holder.stop(); + } + String ksname = "Keyspace1"; String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching Keyspace keyspace = Keyspace.open(ksname); @@ -499,4 +507,59 @@ public class IndexSummaryManagerTest extends SchemaLoader assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval()); } } + + @Test + public void testCancelIndex() throws Exception + { + String ksname = "Keyspace1"; + String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + final int numSSTables = 4; + int numRows = 256; + createSSTables(ksname, cfname, numSSTables, numRows); + + final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables()); + for (SSTableReader sstable : sstables) + sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); + + final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize(); + + // everything should get cut in half + final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>(); + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2))); + } + catch (CompactionInterruptedException ex) + { + exception.set(ex); + } + catch (IOException ignored) + { + } + } + }); + t.start(); + while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive()) + Thread.sleep(1); + CompactionManager.instance.stopCompaction("INDEX_SUMMARY"); + t.join(); + + assertNotNull("Expected compaction interrupted exception", exception.get()); + assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty()); + + Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables); + Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables()); + Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables); + assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s", + Joiner.on(",").join(disjoint)), + disjoint.isEmpty()); + + validateData(cfs, numRows); + } }