Open one sstableScanner per level for leveled compaction patch by slebresne and jbellis for CASSANDRA-4142
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/46e422a9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/46e422a9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/46e422a9 Branch: refs/heads/cassandra-1.1 Commit: 46e422a9417b5b513ceae4c9652ba413e2ede474 Parents: 1686a36 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Apr 27 16:19:24 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed May 2 12:48:23 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/AbstractCompactionIterable.java | 17 +-- .../db/compaction/AbstractCompactionStrategy.java | 23 +++- .../db/compaction/CompactionIterable.java | 20 +--- .../cassandra/db/compaction/CompactionManager.java | 16 +-- .../cassandra/db/compaction/CompactionTask.java | 7 +- .../db/compaction/ICompactionScanner.java | 34 +++++ .../db/compaction/LeveledCompactionStrategy.java | 103 ++++++++++++++- .../cassandra/db/compaction/LeveledManifest.java | 3 +- .../db/compaction/ParallelCompactionIterable.java | 24 ++-- .../apache/cassandra/io/sstable/SSTableReader.java | 2 + .../cassandra/io/sstable/SSTableScanner.java | 13 ++- .../cassandra/service/AntiEntropyService.java | 4 +- test/conf/cassandra.yaml | 1 + test/unit/org/apache/cassandra/SchemaLoader.java | 9 +- .../compaction/LeveledCompactionStrategyTest.java | 87 ++++++++++++ .../cassandra/io/LazilyCompactedRowTest.java | 14 +- 17 files changed, 300 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 62234b2..83c171b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.1.1-dev + * Open 1 sstableScanner per level for leveled compaction (CASSANDRA-4142) * Optimize reads when row deletion timestamps allow us to restrict the set of sstables we check (CASSANDRA-4116) * incremental repair by token range (CASSANDRA-3912) http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java index 95e6590..8976f4e 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.utils.CloseableIterator; public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow> @@ -40,9 +39,9 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i protected final CompactionController controller; protected final long totalBytes; protected volatile long bytesRead; - protected final List<SSTableScanner> scanners; + protected final List<ICompactionScanner> scanners; - public AbstractCompactionIterable(CompactionController controller, OperationType type, List<SSTableScanner> scanners) + public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ICompactionScanner> scanners) { this.controller = controller; this.type = type; @@ -50,19 +49,11 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i this.bytesRead = 0; long bytes = 0; - for (SSTableScanner scanner : scanners) - bytes += scanner.getFileLength(); + for (ICompactionScanner scanner : scanners) + bytes += scanner.getLengthInBytes(); this.totalBytes = bytes; } - protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException - { - ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>(); - for (SSTableReader sstable : sstables) - scanners.add(sstable.getDirectScanner()); - return scanners; - } - public CompactionInfo getCompactionInfo() { return new CompactionInfo(this.hashCode(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 2e70c46..3f51b88 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -18,15 +18,17 @@ package org.apache.cassandra.db.compaction; +import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.StorageService; - /** * Pluggable compaction strategy determines how SSTables get merged. * @@ -126,4 +128,23 @@ public abstract class AbstractCompactionStrategy return filteredCandidates; } + + /** + * Returns a list of KeyScanners given sstables and a range on which to scan. + * The default implementation simply grab one SSTableScanner per-sstable, but overriding this method + * allow for a more memory efficient solution if we know the sstable don't overlap (see + * LeveledCompactionStrategy for instance). + */ + public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException + { + ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(); + for (SSTableReader sstable : sstables) + scanners.add(sstable.getDirectScanner(range)); + return scanners; + } + + public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact) throws IOException + { + return getScanners(toCompact, null); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java index 8a4aa0e..0ead533 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.MergeIterator; @@ -50,25 +49,12 @@ public class CompactionIterable extends AbstractCompactionIterable } }; - public CompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException - { - this(type, getScanners(sstables), controller); - } - - protected CompactionIterable(OperationType type, List<SSTableScanner> scanners, CompactionController controller) + public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) { super(controller, type, scanners); row = 0; } - protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException - { - ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>(); - for (SSTableReader sstable : sstables) - scanners.add(sstable.getDirectScanner()); - return scanners; - } - public CloseableIterator<AbstractCompactedRow> iterator() { return MergeIterator.get(scanners, comparator, new Reducer()); @@ -116,8 +102,8 @@ public class CompactionIterable extends AbstractCompactionIterable if ((row++ % 1000) == 0) { long n = 0; - for (SSTableScanner scanner : scanners) - n += scanner.getFilePointer(); + for (ICompactionScanner scanner : scanners) + n += scanner.getCurrentPosition(); bytesRead = n; controller.mayThrottle(bytesRead); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 0ecac5c..fd14593 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -760,7 +760,7 @@ public class CompactionManager implements CompactionManagerMBean } } if ((rowsRead++ % 1000) == 0) - controller.mayThrottle(scanner.getFilePointer()); + controller.mayThrottle(scanner.getCurrentPosition()); } if (writer != null) newSstable = writer.closeAndOpenReader(sstable.maxDataAge); @@ -986,17 +986,9 @@ public class CompactionManager implements CompactionManagerMBean public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range) throws IOException { super(OperationType.VALIDATION, - getScanners(sstables, range), + cfs.getCompactionStrategy().getScanners(sstables, range), new CompactionController(cfs, sstables, getDefaultGcBefore(cfs), true)); } - - protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range<Token> range) throws IOException - { - ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>(); - for (SSTableReader sstable : sstables) - scanners.add(sstable.getDirectScanner(range)); - return scanners; - } } public int getActiveCompactions() @@ -1196,8 +1188,8 @@ public class CompactionManager implements CompactionManagerMBean sstable.descriptor.ksname, sstable.descriptor.cfname, OperationType.CLEANUP, - scanner.getFilePointer(), - scanner.getFileLength()); + scanner.getCurrentPosition(), + scanner.getLengthInBytes()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index e93725c..b66f20b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -116,15 +116,16 @@ public class CompactionTask extends AbstractCompactionTask long startTime = System.currentTimeMillis(); long totalkeysWritten = 0; + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize()); + long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / strategy.getMaxSSTableSize()); long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : " + keysPerSSTable); AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() - ? new ParallelCompactionIterable(compactionType, toCompact, controller) - : new CompactionIterable(compactionType, toCompact, controller); + ? new ParallelCompactionIterable(compactionType, strategy.getScanners(toCompact), controller) + : new CompactionIterable(compactionType, strategy.getScanners(toCompact), controller); CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java new file mode 100644 index 0000000..0b1a263 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.db.compaction; + +import org.apache.cassandra.db.columniterator.IColumnIterator; +import org.apache.cassandra.utils.CloseableIterator; + +/** + * An ICompactionScanner is an abstraction allowing multiple SSTableScanners to be + * chained together under the hood. See LeveledCompactionStrategy.getScanners. + */ +public interface ICompactionScanner extends CloseableIterator<IColumnIterator> +{ + public long getLengthInBytes(); + public long getCurrentPosition(); + public String getBackingFiles(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index d224e5e..1bc40fd 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -21,17 +21,22 @@ package org.apache.cassandra.db.compaction; */ +import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; +import com.google.common.base.Joiner; +import com.google.common.collect.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.columniterator.IColumnIterator; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.SSTableAddedNotification; @@ -162,6 +167,100 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem return Sets.difference(L0, sstablesToIgnore).size() + manifest.getLevelCount() > 20; } + public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException + { + Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create(); + for (SSTableReader sstable : sstables) + byLevel.get(manifest.levelOf(sstable)).add(sstable); + + List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size()); + for (Integer level : ImmutableSortedSet.copyOf(byLevel.keySet())) + scanners.add(new LeveledScanner(new ArrayList<SSTableReader>(byLevel.get(level)), range)); + + return scanners; + } + + // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the + // same level (e.g. non overlapping) - see #4142 + private static class LeveledScanner extends AbstractIterator<IColumnIterator> implements ICompactionScanner + { + private final Range<Token> range; + private final List<SSTableReader> sstables; + private final Iterator<SSTableReader> sstableIterator; + private final long totalLength; + + private SSTableScanner currentScanner; + private long positionOffset; + + public LeveledScanner(List<SSTableReader> sstables, Range<Token> range) + { + this.range = range; + this.sstables = sstables; + + // Sorting a list we got in argument is bad but it's all private to this class so let's not bother + Collections.sort(sstables, SSTable.sstableComparator); + this.sstableIterator = sstables.iterator(); + + long length = 0; + for (SSTableReader sstable : sstables) + length += sstable.uncompressedLength(); + totalLength = length; + } + + protected IColumnIterator computeNext() + { + try + { + if (currentScanner != null) + { + if (currentScanner.hasNext()) + { + return currentScanner.next(); + } + else + { + positionOffset += currentScanner.getLengthInBytes(); + currentScanner.close(); + currentScanner = null; + return computeNext(); + } + } + + if (!sstableIterator.hasNext()) + return endOfData(); + + SSTableReader reader = sstableIterator.next(); + currentScanner = reader.getDirectScanner(range); + return computeNext(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public void close() throws IOException + { + if (currentScanner != null) + currentScanner.close(); + } + + public long getLengthInBytes() + { + return totalLength; + } + + public long getCurrentPosition() + { + return positionOffset + (currentScanner == null ? 0L : currentScanner.getCurrentPosition()); + } + + public String getBackingFiles() + { + return Joiner.on(", ").join(sstables); + } + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 592f0e9..69ab492 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -303,7 +303,6 @@ public class LeveledManifest public int getLevelSize(int i) { - return generations.length > i ? generations[i].size() : 0; } @@ -322,7 +321,7 @@ public class LeveledManifest } } - private int levelOf(SSTableReader sstable) + int levelOf(SSTableReader sstable) { Integer level = sstableGenerations.get(sstable); if (level == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 03a29cd..79b3396 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -40,7 +40,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.ICountableColumnIterator; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.utils.*; /** @@ -61,17 +60,12 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private final int maxInMemorySize; - public ParallelCompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException + public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) throws IOException { - this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables)); + this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size()); } - public ParallelCompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller, int maxInMemorySize) throws IOException - { - this(type, getScanners(sstables), controller, maxInMemorySize); - } - - protected ParallelCompactionIterable(OperationType type, List<SSTableScanner> scanners, CompactionController controller, int maxInMemorySize) + public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, int maxInMemorySize) { super(controller, type, scanners); this.maxInMemorySize = maxInMemorySize; @@ -80,7 +74,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable public CloseableIterator<AbstractCompactedRow> iterator() { List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>(scanners.size()); - for (SSTableScanner scanner : scanners) + for (ICompactionScanner scanner : scanners) sources.add(new Deserializer(scanner, maxInMemorySize)); return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()), controller); } @@ -164,8 +158,8 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable if ((row++ % 1000) == 0) { long n = 0; - for (SSTableScanner scanner : scanners) - n += scanner.getFilePointer(); + for (ICompactionScanner scanner : scanners) + n += scanner.getCurrentPosition(); bytesRead = n; controller.mayThrottle(bytesRead); } @@ -283,9 +277,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1); private static final RowContainer finished = new RowContainer((Row) null); private Condition condition; - private final SSTableScanner scanner; + private final ICompactionScanner scanner; - public Deserializer(SSTableScanner ssts, final int maxInMemorySize) + public Deserializer(ICompactionScanner ssts, final int maxInMemorySize) { this.scanner = ssts; Runnable runnable = new WrappedRunnable() @@ -318,7 +312,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable } } }; - new Thread(runnable, "Deserialize " + scanner.sstable).start(); + new Thread(runnable, "Deserialize " + scanner.getBackingFiles()).start(); } protected RowContainer computeNext() http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index c332ae6..2482188 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -847,6 +847,8 @@ public class SSTableReader extends SSTable */ public SSTableScanner getDirectScanner(Range<Token> range) { + if (range == null) + return getDirectScanner(); return new SSTableBoundedScanner(this, true, range); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 5e4f269..26ed908 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -27,15 +27,15 @@ import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CloseableIterator; -public class SSTableScanner implements CloseableIterator<IColumnIterator> +public class SSTableScanner implements ICompactionScanner { private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class); @@ -107,7 +107,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } } - public long getFileLength() + public long getLengthInBytes() { try { @@ -119,11 +119,16 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } } - public long getFilePointer() + public long getCurrentPosition() { return file.getFilePointer(); } + public String getBackingFiles() + { + return sstable.toString(); + } + public boolean hasNext() { if (iterator == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java index 0c11947..d76d26f 100644 --- a/src/java/org/apache/cassandra/service/AntiEntropyService.java +++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java @@ -266,7 +266,7 @@ public class AntiEntropyService public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]); - Validator(TreeRequest request) + public Validator(TreeRequest request) { this(request, // TODO: memory usage (maxsize) should either be tunable per @@ -546,7 +546,7 @@ public class AntiEntropyService /** * A tuple of table and cf. */ - static class CFPair extends Pair<String,String> + public static class CFPair extends Pair<String,String> { public CFPair(String table, String cf) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 315a75a..3578493 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -33,3 +33,4 @@ encryption_options: truststore_password: cassandra incremental_backups: true flush_largest_memtables_at: 1.0 +compaction_throughput_mb_per_sec: 0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 8087ea6..1d3bd83 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -28,6 +28,7 @@ import com.google.common.base.Charsets; import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.gms.Gossiper; @@ -140,6 +141,10 @@ public class SchemaLoader null, null)); + // Make it easy to test leveled compaction + Map<String, String> leveledOptions = new HashMap<String, String>(); + leveledOptions.put("sstable_size_in_mb", "1"); + // Keyspace 1 schema.add(KSMetaData.testMetadata(ks1, simple, @@ -198,7 +203,9 @@ public class SchemaLoader "StandardDynamicComposite", st, dynamicComposite, - null))); + null), + standardCFMD(ks1, "StandardLeveled").compactionStrategyClass(LeveledCompactionStrategy.class) + .compactionStrategyOptions(leveledOptions))); // Keyspace 2 schema.add(KSMetaData.testMetadata(ks2, http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java new file mode 100644 index 0000000..56f12de --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Future; + +import org.junit.Test; +import static org.junit.Assert.*; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.service.AntiEntropyService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +public class LeveledCompactionStrategyTest extends SchemaLoader +{ + /* + * This excercise in particular the code of #4142 + */ + @Test + public void testValidationMultipleSSTablePerLevel() throws Exception + { + String ksname = "Keyspace1"; + String cfname = "StandardLeveled"; + Table table = Table.open(ksname); + ColumnFamilyStore store = table.getColumnFamilyStore(cfname); + + ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files + + // Enough data to have a level 1 and 2 + int rows = 20; + int columns = 10; + + // Adds enough data to trigger multiple sstable per level + for (int r = 0; r < rows; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + RowMutation rm = new RowMutation(ksname, key.key); + for (int c = 0; c < columns; c++) + { + rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("column" + c)), value, 0); + } + rm.apply(); + store.forceFlush(); + } + + LeveledCompactionStrategy strat = (LeveledCompactionStrategy)store.getCompactionStrategy(); + + while (strat.getLevelSize(0) > 0) + { + store.forceMajorCompaction(); + Thread.sleep(200); + } + // Checking we're not completely bad at math + assert strat.getLevelSize(1) > 0; + assert strat.getLevelSize(2) > 0; + + AntiEntropyService.CFPair p = new AntiEntropyService.CFPair(ksname, cfname); + Range<Token> range = new Range<Token>(Util.token(""), Util.token("")); + AntiEntropyService.TreeRequest req = new AntiEntropyService.TreeRequest("1", FBUtilities.getLocalAddress(), range, p); + AntiEntropyService.Validator validator = new AntiEntropyService.Validator(req); + CompactionManager.instance.submitValidation(store, validator).get(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java index 22ffe97..0f48b3d 100644 --- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java +++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java @@ -57,23 +57,24 @@ public class LazilyCompactedRowTest extends SchemaLoader { private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException { + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); Collection<SSTableReader> sstables = cfs.getSSTables(); // compare eager and lazy compactions AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN, - sstables, + strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false)); AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN, - sstables, + strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false)); assertBytes(cfs, sstables, eager, lazy); // compare eager and parallel-lazy compactions eager = new CompactionIterable(OperationType.UNKNOWN, - sstables, + strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false)); AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN, - sstables, + strategy.getScanners(sstables), new CompactionController(cfs, sstables, gcBefore, false), 0); assertBytes(cfs, sstables, eager, parallel); @@ -155,9 +156,10 @@ public class LazilyCompactedRowTest extends SchemaLoader private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException { + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); Collection<SSTableReader> sstables = cfs.getSSTables(); - AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)); - AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)); + AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false)); + AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false)); CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator(); CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();