Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 d2bc37f43 -> 14e46e462 refs/heads/trunk 2d2879db7 -> 41904684b
Cache disk boundaries Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-13215 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14e46e46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14e46e46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14e46e46 Branch: refs/heads/cassandra-3.11 Commit: 14e46e462cfee15cd06419ee81eb6d9571b6805e Parents: d2bc37f Author: Marcus Eriksson <marc...@apache.org> Authored: Fri Oct 27 14:27:36 2017 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri Nov 24 14:18:04 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/BlacklistedDirectories.java | 10 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 13 ++ .../org/apache/cassandra/db/DiskBoundaries.java | 71 +++++++++ .../cassandra/db/DiskBoundaryManager.java | 153 +++++++++++++++++++ src/java/org/apache/cassandra/db/Keyspace.java | 2 + src/java/org/apache/cassandra/db/Memtable.java | 21 +-- .../db/compaction/CompactionManager.java | 8 +- .../compaction/CompactionStrategyManager.java | 42 ++--- .../cassandra/db/compaction/Scrubber.java | 2 +- .../writers/CompactionAwareWriter.java | 13 +- .../sstable/format/RangeAwareSSTableWriter.java | 13 +- .../cassandra/service/StorageService.java | 58 ------- .../cassandra/db/DiskBoundaryManagerTest.java | 124 +++++++++++++++ 14 files changed, 421 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3fc934e..fc18dc3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.2 + * Cache disk boundaries (CASSANDRA-13215) * Add asm jar to build.xml for maven builds (CASSANDRA-11193) * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897) * Update jackson JSON jars (CASSANDRA-13949) http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/BlacklistedDirectories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java index b8a5914..f090013 100644 --- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java +++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java @@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory; import java.util.Collections; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -42,6 +43,8 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>(); private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>(); + private static final AtomicInteger directoriesVersion = new AtomicInteger(); + private BlacklistedDirectories() { // Register this instance with JMX @@ -89,6 +92,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean File directory = getDirectory(path); if (instance.unreadableDirectories.add(directory)) { + directoriesVersion.incrementAndGet(); logger.warn("Blacklisting {} for reads", directory); return directory; } @@ -106,12 +110,18 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean File directory = getDirectory(path); if (instance.unwritableDirectories.add(directory)) { + directoriesVersion.incrementAndGet(); logger.warn("Blacklisting {} for writes", directory); return directory; } return null; } + public static int getDirectoriesVersion() + { + return directoriesVersion.get(); + } + /** * Testing only! * Clear the set of unwritable directories. http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index c9514ca..6077b8d 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -250,6 +250,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private volatile boolean compactionSpaceCheck = true; + @VisibleForTesting + final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager(); + public static void shutdownPostFlushExecutor() throws InterruptedException { postFlushExecutor.shutdown(); @@ -2649,4 +2652,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { return getIfExists(tableId).metric; } + + public DiskBoundaries getDiskBoundaries() + { + return diskBoundaryManager.getDiskBoundaries(this); + } + + public void invalidateDiskBoundaries() + { + diskBoundaryManager.invalidate(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaries.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java new file mode 100644 index 0000000..ba5a093 --- /dev/null +++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +public class DiskBoundaries +{ + public final List<Directories.DataDirectory> directories; + public final ImmutableList<PartitionPosition> positions; + final long ringVersion; + final int directoriesVersion; + + DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion) + { + this.directories = directories == null ? null : ImmutableList.copyOf(directories); + this.positions = positions == null ? null : ImmutableList.copyOf(positions); + this.ringVersion = ringVersion; + this.directoriesVersion = diskVersion; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DiskBoundaries that = (DiskBoundaries) o; + + if (ringVersion != that.ringVersion) return false; + if (directoriesVersion != that.directoriesVersion) return false; + if (!directories.equals(that.directories)) return false; + return positions != null ? positions.equals(that.positions) : that.positions == null; + } + + public int hashCode() + { + int result = directories != null ? directories.hashCode() : 0; + result = 31 * result + (positions != null ? positions.hashCode() : 0); + result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32)); + result = 31 * result + directoriesVersion; + return result; + } + + public String toString() + { + return "DiskBoundaries{" + + "directories=" + directories + + ", positions=" + positions + + ", ringVersion=" + ringVersion + + ", directoriesVersion=" + directoriesVersion + + '}'; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java new file mode 100644 index 0000000..7872554 --- /dev/null +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Splitter; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +public class DiskBoundaryManager +{ + private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class); + private volatile DiskBoundaries diskBoundaries; + + public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs) + { + if (!cfs.getPartitioner().splitter().isPresent()) + return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1); + // copy the reference to avoid getting nulled out by invalidate() below + // - it is ok to race, compaction will move any incorrect tokens to their correct places, but + // returning null would be bad + DiskBoundaries db = diskBoundaries; + if (isOutOfDate(diskBoundaries)) + { + synchronized (this) + { + db = diskBoundaries; + if (isOutOfDate(diskBoundaries)) + { + logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName()); + DiskBoundaries oldBoundaries = diskBoundaries; + db = diskBoundaries = getDiskBoundaryValue(cfs); + logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName()); + } + } + } + return db; + } + + /** + * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion + */ + private boolean isOutOfDate(DiskBoundaries db) + { + if (db == null) + return true; + long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion(); + int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion(); + return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion; + } + + private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs) + { + Collection<Range<Token>> localRanges; + + long ringVersion; + TokenMetadata tmd; + do + { + tmd = StorageService.instance.getTokenMetadata(); + ringVersion = tmd.getRingVersion(); + if (StorageService.instance.isBootstrapMode()) + { + localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress()); + } + else + { + // Reason we use use the future settled TMD is that if we decommission a node, we want to stream + // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. + // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled + localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress()); + } + logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion); + } + while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that + // it might have changed before we calculated localRanges - recalculate + + int directoriesVersion; + Directories.DataDirectory[] dirs; + do + { + directoriesVersion = BlacklistedDirectories.getDirectoriesVersion(); + dirs = cfs.getDirectories().getWriteableLocations(); + } + while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate + + if (localRanges == null || localRanges.isEmpty()) + return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion); + + List<Range<Token>> sortedLocalRanges = Range.sort(localRanges); + + List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs); + return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion); + } + + /** + * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not. + * + * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to + * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk + * etc. + * + * The final entry in the returned list will always be the partitioner maximum tokens upper key bound + */ + private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) + { + assert partitioner.splitter().isPresent(); + Splitter splitter = partitioner.splitter().get(); + boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1; + List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges); + // If we can't split by ranges, split evenly to ensure utilisation of all disks + if (dontSplitRanges && boundaries.size() < dataDirectories.length) + boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false); + + List<PartitionPosition> diskBoundaries = new ArrayList<>(); + for (int i = 0; i < boundaries.size() - 1; i++) + diskBoundaries.add(boundaries.get(i).maxKeyBound()); + diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound()); + return diskBoundaries; + } + + public void invalidate() + { + diskBoundaries = null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index cffdb80..01fd451 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -344,6 +344,8 @@ public class Keyspace StorageService.instance.getTokenMetadata(), DatabaseDescriptor.getEndpointSnitch(), ksm.params.replication.options); + logger.debug("New replication strategy instance - invalidating disk boundary cache"); + columnFamilyStores.values().forEach(ColumnFamilyStore::invalidateDiskBoundaries); } // best invoked on the compaction mananger. http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index e0b27fa..cf04016 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -36,7 +36,6 @@ import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -54,7 +53,6 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.memory.HeapPool; import org.apache.cassandra.utils.memory.MemtableAllocator; @@ -297,20 +295,17 @@ public class Memtable implements Comparable<Memtable> public List<FlushRunnable> flushRunnables(LifecycleTransaction txn) { - List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName())); - - if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty()) - return Collections.singletonList(new FlushRunnable(txn)); - - return createFlushRunnables(localRanges, txn); + return createFlushRunnables(txn); } - private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn) + private List<FlushRunnable> createFlushRunnables(LifecycleTransaction txn) { - assert cfs.getPartitioner().splitter().isPresent(); + DiskBoundaries diskBoundaries = cfs.getDiskBoundaries(); + List<PartitionPosition> boundaries = diskBoundaries.positions; + List<Directories.DataDirectory> locations = diskBoundaries.directories; + if (boundaries == null) + return Collections.singletonList(new FlushRunnable(txn)); - Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); - List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations); List<FlushRunnable> runnables = new ArrayList<>(boundaries.size()); PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound(); try @@ -318,7 +313,7 @@ public class Memtable implements Comparable<Memtable> for (int i = 0; i < boundaries.size(); i++) { PartitionPosition t = boundaries.get(i); - runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn)); + runnables.add(new FlushRunnable(rangeStart, t, locations.get(i), txn)); rangeStart = t; } return runnables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 518e3b5..0a2b461 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -517,9 +517,7 @@ public class CompactionManager implements CompactionManagerMBean return AllSSTableOpStatus.ABORTED; } - final List<Range<Token>> localRanges = Range.sort(r); - final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); - final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations); + final List<PartitionPosition> diskBoundaries = cfs.getDiskBoundaries().positions; return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @@ -531,7 +529,7 @@ public class CompactionManager implements CompactionManagerMBean transaction.cancel(Sets.difference(originals, needsRelocation)); Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) -> - CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s))); + CompactionStrategyManager.getCompactionStrategyIndex(cfs, s))); int maxSize = 0; for (List<SSTableReader> diskSSTables : groupedByDisk.values()) @@ -551,7 +549,7 @@ public class CompactionManager implements CompactionManagerMBean { if (!cfs.getPartitioner().splitter().isPresent()) return true; - int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable); + int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable); Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); Directories.DataDirectory location = locations[directoryIndex]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 94def2a..6305096 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -25,6 +25,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.DiskBoundaries; import org.apache.cassandra.index.Index; import com.google.common.primitives.Ints; @@ -36,7 +38,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.dht.Range; @@ -49,7 +50,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.notifications.*; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.service.StorageService; /** * Manages the compaction strategies. @@ -207,7 +207,7 @@ public class CompactionStrategyManager implements INotificationConsumer */ public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) { - int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int index = getCompactionStrategyIndex(cfs, sstable); readLock.lock(); try { @@ -231,30 +231,30 @@ public class CompactionStrategyManager implements INotificationConsumer * sstables in the correct locations and give them to the correct compaction strategy instance. * * @param cfs - * @param locations * @param sstable * @return */ - public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable) + public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable) { if (!cfs.getPartitioner().splitter().isPresent()) return 0; - Directories.DataDirectory[] directories = locations.getWriteableLocations(); - List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories); - if (boundaries == null) + DiskBoundaries boundaries = cfs.getDiskBoundaries(); + List<Directories.DataDirectory> directories = boundaries.directories; + + if (boundaries.positions == null) { // try to figure out location based on sstable directory: - for (int i = 0; i < directories.length; i++) + for (int i = 0; i < directories.size(); i++) { - Directories.DataDirectory directory = directories[i]; + Directories.DataDirectory directory = directories.get(i); if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath())) return i; } return 0; } - int pos = Collections.binarySearch(boundaries, sstable.first); + int pos = Collections.binarySearch(boundaries.positions, sstable.first); assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal return -pos - 1; } @@ -449,7 +449,7 @@ public class CompactionStrategyManager implements INotificationConsumer for (SSTableReader sstable : removed) { - int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int i = getCompactionStrategyIndex(cfs, sstable); if (sstable.isRepaired()) repairedRemoved.get(i).add(sstable); else @@ -457,7 +457,7 @@ public class CompactionStrategyManager implements INotificationConsumer } for (SSTableReader sstable : added) { - int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int i = getCompactionStrategyIndex(cfs, sstable); if (sstable.isRepaired()) repairedAdded.get(i).add(sstable); else @@ -494,7 +494,7 @@ public class CompactionStrategyManager implements INotificationConsumer { for (SSTableReader sstable : sstables) { - int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + int index = getCompactionStrategyIndex(cfs, sstable); if (sstable.isRepaired()) { unrepaired.get(index).removeSSTable(sstable); @@ -608,9 +608,9 @@ public class CompactionStrategyManager implements INotificationConsumer for (SSTableReader sstable : sstables) { if (sstable.isRepaired()) - repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable); + repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable); else - unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable); + unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable); } List<ISSTableScanner> scanners = new ArrayList<>(sstables.size()); @@ -647,7 +647,7 @@ public class CompactionStrategyManager implements INotificationConsumer readLock.lock(); try { - Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s))); Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>(); for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet()) @@ -685,12 +685,12 @@ public class CompactionStrategyManager implements INotificationConsumer SSTableReader firstSSTable = Iterables.getFirst(input, null); assert firstSSTable != null; boolean repaired = firstSSTable.isRepaired(); - int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable); + int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable); for (SSTableReader sstable : input) { if (sstable.isRepaired() != repaired) throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction"); - if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable)) + if (firstIndex != getCompactionStrategyIndex(cfs, sstable)) throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction"); } } @@ -752,11 +752,11 @@ public class CompactionStrategyManager implements INotificationConsumer { Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream() .filter(s -> !s.isMarkedSuspect() && s.isRepaired()) - .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s))); Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream() .filter(s -> !s.isMarkedSuspect() && !s.isRepaired()) - .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s))); for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 0007e30..b1f2e9f 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -98,7 +98,7 @@ public class Scrubber implements Closeable List<SSTableReader> toScrub = Collections.singletonList(sstable); - int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable); + int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable); this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]); this.isCommutative = cfs.metadata.isCounter(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 205aebe..d2f816b 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.DiskBoundaries; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.CompactionTask; @@ -38,7 +39,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Transactional; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.service.StorageService; /** @@ -58,7 +58,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final SSTableRewriter sstableWriter; protected final LifecycleTransaction txn; - private final Directories.DataDirectory[] locations; + private final List<Directories.DataDirectory> locations; private final List<PartitionPosition> diskBoundaries; private int locationIndex; @@ -88,8 +88,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge); minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); - locations = cfs.getDirectories().getWriteableLocations(); - diskBoundaries = StorageService.getDiskBoundaries(cfs); + DiskBoundaries db = cfs.getDiskBoundaries(); + diskBoundaries = db.positions; + locations = db.directories; locationIndex = -1; } @@ -174,8 +175,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0) locationIndex++; if (prevIdx >= 0) - logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]); - switchCompactionLocation(locations[locationIndex]); + logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex)); + switchCompactionLocation(locations.get(locationIndex)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 3665da7..353aacb 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@ -26,19 +26,19 @@ import java.util.UUID; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.DiskBoundaries; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; public class RangeAwareSSTableWriter implements SSTableMultiWriter { private final List<PartitionPosition> boundaries; - private final Directories.DataDirectory[] directories; + private final List<Directories.DataDirectory> directories; private final int sstableLevel; private final long estimatedKeys; private final long repairedAt; @@ -53,15 +53,16 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException { - directories = cfs.getDirectories().getWriteableLocations(); + DiskBoundaries db = cfs.getDiskBoundaries(); + directories = db.directories; this.sstableLevel = sstableLevel; this.cfs = cfs; - this.estimatedKeys = estimatedKeys / directories.length; + this.estimatedKeys = estimatedKeys / directories.size(); this.repairedAt = repairedAt; this.format = format; this.txn = txn; this.header = header; - boundaries = StorageService.getDiskBoundaries(cfs, directories); + boundaries = db.positions; if (boundaries == null) { Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); @@ -90,7 +91,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter if (currentWriter != null) finishedWriters.add(currentWriter); - Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format); + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format); currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5b4e552..e93430b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -5205,62 +5205,4 @@ public class StorageService extends NotificationBroadcasterSupport implements IE DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB); } - - public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories) - { - if (!cfs.getPartitioner().splitter().isPresent()) - return null; - - Collection<Range<Token>> lr; - - if (StorageService.instance.isBootstrapMode()) - { - lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress()); - } - else - { - // Reason we use use the future settled TMD is that if we decommission a node, we want to stream - // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. - // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled - TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled(); - lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress()); - } - - if (lr == null || lr.isEmpty()) - return null; - List<Range<Token>> localRanges = Range.sort(lr); - - return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories); - } - - public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs) - { - return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations()); - } - - /** - * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not. - * - * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to - * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk - * etc. - * - * The final entry in the returned list will always be the partitioner maximum tokens upper key bound - */ - public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) - { - assert partitioner.splitter().isPresent(); - Splitter splitter = partitioner.splitter().get(); - boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1; - List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, dontSplitRanges); - // If we can't split by ranges, split evenly to ensure utilisation of all disks - if (dontSplitRanges && boundaries.size() < dataDirectories.length) - boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, false); - - List<PartitionPosition> diskBoundaries = new ArrayList<>(); - for (int i = 0; i < boundaries.size() - 1; i++) - diskBoundaries.add(boundaries.get(i).maxKeyBound()); - diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound()); - return diskBoundaries; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java new file mode 100644 index 0000000..de79959 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.dht.BootStrapper; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class DiskBoundaryManagerTest extends CQLTester +{ + private DiskBoundaryManager dbm; + private MockCFS mock; + private Directories dirs; + + @Before + public void setup() + { + BlacklistedDirectories.clearUnwritableUnsafe(); + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress()); + createTable("create table %s (id int primary key, x text)"); + dbm = getCurrentColumnFamilyStore().diskBoundaryManager; + dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), + new Directories.DataDirectory(new File("/tmp/2")), + new Directories.DataDirectory(new File("/tmp/3")))); + mock = new MockCFS(getCurrentColumnFamilyStore(), dirs); + } + + @Test + public void getBoundariesTest() + { + DiskBoundaries dbv = dbm.getDiskBoundaries(mock); + Assert.assertEquals(3, dbv.positions.size()); + assertEquals(dbv.directories, dirs.getWriteableLocations()); + } + + @Test + public void blackListTest() + { + DiskBoundaries dbv = dbm.getDiskBoundaries(mock); + Assert.assertEquals(3, dbv.positions.size()); + assertEquals(dbv.directories, dirs.getWriteableLocations()); + BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3")); + dbv = dbm.getDiskBoundaries(mock); + Assert.assertEquals(2, dbv.positions.size()); + Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), + new Directories.DataDirectory(new File("/tmp/2"))), + dbv.directories); + } + + @Test + public void updateTokensTest() throws UnknownHostException + { + DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock); + StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10")); + DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock); + assertFalse(dbv1.equals(dbv2)); + } + + @Test + public void alterKeyspaceTest() throws Throwable + { + DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock); + execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }"); + DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock); + // == on purpose - we just want to make sure that there is a new instance cached + assertFalse(dbv1 == dbv2); + DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock); + assertTrue(dbv2 == dbv3); + + } + + private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2) + { + if (dir1.size() != dir2.length) + fail(); + for (int i = 0; i < dir2.length; i++) + { + if (!dir1.get(i).equals(dir2[i])) + fail(); + } + } + + // just to be able to override the data directories + private static class MockCFS extends ColumnFamilyStore + { + MockCFS(ColumnFamilyStore cfs, Directories dirs) + { + super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org