Merge branch cassandra-3.11 into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/96899bbb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/96899bbb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/96899bbb Branch: refs/heads/trunk Commit: 96899bbb60ebba05408c3248e756ed33605e8075 Parents: a741efd 5c9db9a Author: Benjamin Lerer <b.le...@gmail.com> Authored: Thu Jun 1 10:20:29 2017 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Thu Jun 1 10:22:54 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + doc/source/operating/metrics.rst | 2 +- .../cassandra/db/PartitionRangeReadCommand.java | 20 ++++- .../db/SinglePartitionReadCommand.java | 83 ++++++++++++++------ .../org/apache/cassandra/db/StorageHook.java | 53 ++++++++----- .../UnfilteredRowIteratorWithLowerBound.java | 10 ++- .../io/sstable/format/SSTableReader.java | 62 +++++++++------ .../io/sstable/format/SSTableReadsListener.java | 81 +++++++++++++++++++ .../io/sstable/format/big/BigTableReader.java | 33 +++++--- .../io/sstable/format/big/BigTableScanner.java | 23 ++++-- .../miscellaneous/SSTablesIteratedTest.java | 69 +++++++++++++++- .../cassandra/db/compaction/TTLExpiryTest.java | 5 +- .../sstable/SSTableCorruptionDetectionTest.java | 7 +- .../io/sstable/SSTableScannerTest.java | 5 +- .../cassandra/io/sstable/SSTableWriterTest.java | 7 +- 16 files changed, 369 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/NEWS.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/doc/source/operating/metrics.rst ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index a47302b,aa8271d..bc80907 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@@ -206,16 -209,17 +207,17 @@@ public class PartitionRangeReadCommand for (Memtable memtable : view.memtables) { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method - Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift()); + Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange()); oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime()); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + iterators.add(iter); } + SSTableReadsListener readCountUpdater = newReadCountUpdater(); for (SSTableReader sstable : view.sstables) { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method - UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange()); - UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift(), readCountUpdater); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); ++ UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater); + iterators.add(iter); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index d5d2901,47c426e..5ebfdcc --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -567,7 -600,7 +569,7 @@@ public class SinglePartitionReadComman @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, // or through the closing of the final merged iterator - UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable); - UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, true, metricsCollector); ++ UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); @@@ -587,7 -620,7 +589,7 @@@ @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception, // or through the closing of the final merged iterator - UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable); - UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, false, metricsCollector); ++ UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); @@@ -600,10 -633,10 +602,10 @@@ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); if (iterators.isEmpty()) - return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); + return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed()); - StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey()); + StorageHook.instance.reportRead(cfs.metadata().id, partitionKey()); - return withSSTablesIterated(iterators, cfs.metric); + return withSSTablesIterated(iterators, cfs.metric, metricsCollector); } catch (RuntimeException | Error e) { @@@ -630,13 -663,20 +632,16 @@@ return clusteringIndexFilter().shouldInclude(sstable); } - private UnfilteredRowIteratorWithLowerBound makeIterator(ColumnFamilyStore cfs, final SSTableReader sstable) + private UnfilteredRowIteratorWithLowerBound makeIterator(ColumnFamilyStore cfs, - final SSTableReader sstable, - boolean applyThriftTransformation, ++ SSTableReader sstable, + SSTableReadsListener listener) { return StorageHook.instance.makeRowIteratorWithLowerBound(cfs, partitionKey(), sstable, clusteringIndexFilter(), - columnFilter()); + columnFilter(), - isForThrift(), - nowInSec(), - applyThriftTransformation, + listener); } @@@ -743,10 -780,15 +745,14 @@@ continue; // no tombstone at all, we can skip that sstable // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable. - sstable.incrementReadCount(); - try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed())) + try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, + sstable, + partitionKey(), + filter.getSlices(metadata()), + columnFilter(), + filter.isReversed(), - isForThrift(), + metricsCollector)) { - sstablesIterated++; if (!iter.partitionLevelDeletion().isLive()) result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired()); else @@@ -755,17 -797,21 +761,20 @@@ continue; } - Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); - sstable.incrementReadCount(); - try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed())) + try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, + sstable, + partitionKey(), + filter.getSlices(metadata()), + columnFilter(), + filter.isReversed(), - isForThrift(), + metricsCollector)) { if (iter.isEmpty()) continue; if (sstable.isRepaired()) onlyUnrepaired = false; - sstablesIterated++; - result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired()); + result = add(iter, result, filter, sstable.isRepaired()); } } @@@ -776,10 -822,10 +785,10 @@@ DecoratedKey key = result.partitionKey(); cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); - StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey()); + StorageHook.instance.reportRead(cfs.metadata.id, partitionKey()); // "hoist up" the requested data into a more recent sstable - if (sstablesIterated > cfs.getMinimumCompactionThreshold() + if (metricsCollector.getMergedSSTables() > cfs.getMinimumCompactionThreshold() && onlyUnrepaired && !cfs.isAutoCompactionDisabled() && cfs.getCompactionStrategyManager().shouldDefragment()) @@@ -1041,12 -1088,40 +1050,40 @@@ private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException { - DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize())); + DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, DatabaseDescriptor.getMaxValueSize())); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); - return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter); + return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter); } } + + /** + * {@code SSTableReaderListener} used to collect metrics about SSTable read access. + */ + private static final class SSTableReadMetricsCollector implements SSTableReadsListener + { + /** + * The number of SSTables that need to be merged. This counter is only updated for single partition queries + * since this has been the behavior so far. + */ + private int mergedSSTables; + + @Override + public void onSSTableSelected(SSTableReader sstable, RowIndexEntry<?> indexEntry, SelectionReason reason) + { + sstable.incrementReadCount(); + mergedSSTables++; + } + + /** + * Returns the number of SSTables that need to be merged. + * @return the number of SSTables that need to be merged. + */ + public int getMergedSSTables() + { + return mergedSSTables; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/StorageHook.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/StorageHook.java index 3df8805,48d7ede..be1d0bf --- a/src/java/org/apache/cassandra/db/StorageHook.java +++ b/src/java/org/apache/cassandra/db/StorageHook.java @@@ -24,7 -26,7 +24,8 @@@ import org.apache.cassandra.db.partitio import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound; import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.FBUtilities; public interface StorageHook @@@ -37,13 -39,20 +38,15 @@@ DecoratedKey partitionKey, SSTableReader sstable, ClusteringIndexFilter filter, - ColumnFilter selectedColumns); + ColumnFilter selectedColumns, - boolean isForThrift, - int nowInSec, - boolean applyThriftTransformation, + SSTableReadsListener listener); - public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs, SSTableReader sstable, DecoratedKey key, Slices slices, ColumnFilter selectedColumns, - boolean reversed); + boolean reversed, - boolean isForThrift, + SSTableReadsListener listener); static StorageHook createHook() { @@@ -52,27 -61,44 +55,37 @@@ { return FBUtilities.construct(className, StorageHook.class.getSimpleName()); } - else + + return new StorageHook() { - return new StorageHook() - { - public void reportWrite(TableId tableId, PartitionUpdate partitionUpdate) {} - public void reportWrite(UUID cfid, PartitionUpdate partitionUpdate) {} ++ public void reportWrite(TableId tableId, PartitionUpdate partitionUpdate) {} - public void reportRead(TableId tableId, DecoratedKey key) {} - public void reportRead(UUID cfid, DecoratedKey key) {} ++ public void reportRead(TableId tableId, DecoratedKey key) {} - public UnfilteredRowIteratorWithLowerBound makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, DecoratedKey partitionKey, SSTableReader sstable, ClusteringIndexFilter filter, ColumnFilter selectedColumns) - { - return new UnfilteredRowIteratorWithLowerBound(partitionKey, - sstable, - filter, - selectedColumns); - } + public UnfilteredRowIteratorWithLowerBound makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, + DecoratedKey partitionKey, + SSTableReader sstable, + ClusteringIndexFilter filter, + ColumnFilter selectedColumns, - boolean isForThrift, - int nowInSec, - boolean applyThriftTransformation, + SSTableReadsListener listener) + { + return new UnfilteredRowIteratorWithLowerBound(partitionKey, + sstable, + filter, + selectedColumns, - isForThrift, - nowInSec, - applyThriftTransformation, + listener); + } - public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs, SSTableReader sstable, DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed) - { - return sstable.iterator(key, slices, selectedColumns, reversed); - } - }; - } + public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs, + SSTableReader sstable, + DecoratedKey key, + Slices slices, + ColumnFilter selectedColumns, + boolean reversed, - boolean isForThrift, + SSTableReadsListener listener) + { - return sstable.iterator(key, slices, selectedColumns, reversed, isForThrift, listener); ++ return sstable.iterator(key, slices, selectedColumns, reversed, listener); + } + }; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java index d23c37c,84a742b..71c3d7f --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java @@@ -31,7 -31,9 +31,8 @@@ import org.apache.cassandra.db.filter.C import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.io.sstable.IndexInfo; import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.io.sstable.format.SSTableReadsListener; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; -import org.apache.cassandra.thrift.ThriftResultsMerger; import org.apache.cassandra.utils.IteratorWithLowerBound; /** @@@ -47,18 -49,30 +48,21 @@@ public class UnfilteredRowIteratorWithL private final SSTableReader sstable; private final ClusteringIndexFilter filter; private final ColumnFilter selectedColumns; - private final boolean isForThrift; - private final int nowInSec; - private final boolean applyThriftTransformation; + private final SSTableReadsListener listener; private ClusteringBound lowerBound; private boolean firstItemRetrieved; public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey, SSTableReader sstable, ClusteringIndexFilter filter, - ColumnFilter selectedColumns) + ColumnFilter selectedColumns, - boolean isForThrift, - int nowInSec, - boolean applyThriftTransformation, + SSTableReadsListener listener) { super(partitionKey); this.sstable = sstable; this.filter = filter; this.selectedColumns = selectedColumns; - this.isForThrift = isForThrift; - this.nowInSec = nowInSec; - this.applyThriftTransformation = applyThriftTransformation; + this.listener = listener; this.lowerBound = null; this.firstItemRetrieved = false; } @@@ -89,11 -103,11 +93,9 @@@ @Override protected UnfilteredRowIterator initializeIterator() { - sstable.incrementReadCount(); - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator - UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed()); - UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift, listener); - return isForThrift && applyThriftTransformation - ? ThriftResultsMerger.maybeWrap(iter, nowInSec) - : iter; ++ UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), listener); + return iter; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 4495edf,e9b2491..25cfddb --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@@ -1512,35 -1523,55 +1512,62 @@@ public abstract class SSTableReader ext return null; } + private boolean keyCacheEnabled() + { + return keyCache != null && keyCache.getCapacity() > 0 && metadata().params.caching.cacheKeys(); + } + /** - * Get position updating key cache and stats. - * @see #getPosition(PartitionPosition, SSTableReader.Operator, boolean) + * Retrieves the position while updating the key cache and the stats. + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. */ - public RowIndexEntry getPosition(PartitionPosition key, Operator op) + public final RowIndexEntry getPosition(PartitionPosition key, Operator op) { - return getPosition(key, op, true, false); + return getPosition(key, op, SSTableReadsListener.NOOP_LISTENER); } - public RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats) + /** + * Retrieves the position while updating the key cache and the stats. + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. + * @param listener the {@code SSTableReaderListener} that must handle the notifications. + */ + public final RowIndexEntry getPosition(PartitionPosition key, Operator op, SSTableReadsListener listener) { - return getPosition(key, op, updateCacheAndStats, false); + return getPosition(key, op, true, false, listener); } + - public final RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats) ++ public final RowIndexEntry getPosition(PartitionPosition key, ++ Operator op, ++ boolean updateCacheAndStats) + { + return getPosition(key, op, updateCacheAndStats, false, SSTableReadsListener.NOOP_LISTENER); + } ++ /** * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to * allow key selection by token bounds but only if op != * EQ * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. * @param updateCacheAndStats true if updating stats and cache + * @param listener a listener used to handle internal events * @return The index entry corresponding to the key, or null if the key is not present */ - protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast); + protected abstract RowIndexEntry getPosition(PartitionPosition key, + Operator op, + boolean updateCacheAndStats, + boolean permitMatchPastLast, + SSTableReadsListener listener); + + public abstract UnfilteredRowIterator iterator(DecoratedKey key, + Slices slices, + ColumnFilter selectedColumns, + boolean reversed, - boolean isForThrift, + SSTableReadsListener listener); - public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed); - public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift); + public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed); public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly); @@@ -1698,9 -1753,14 +1725,10 @@@ /** * @param columns the columns to return. * @param dataRange filter to use when reading the columns + * @param listener a listener used to handle internal read events * @return A Scanner for seeking over the rows of the SSTable. */ - public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange); - public abstract ISSTableScanner getScanner(ColumnFilter columns, - DataRange dataRange, - RateLimiter limiter, - boolean isForThrift, - SSTableReadsListener listener); ++ public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener); public FileDataInput getFileDataInput(long position) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java index 0000000,8f6e3c0..6d384bf mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java @@@ -1,0 -1,82 +1,81 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.io.sstable.format; + + import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason; + + /** + * Listener for receiving notifications associated with reading SSTables. + */ + public interface SSTableReadsListener + { + /** + * The reasons for skipping an SSTable + */ + enum SkippingReason + { + BLOOM_FILTER, + MIN_MAX_KEYS, + PARTITION_INDEX_LOOKUP, + INDEX_ENTRY_NOT_FOUND; + } + + /** + * The reasons for selecting an SSTable + */ + enum SelectionReason + { + KEY_CACHE_HIT, + INDEX_ENTRY_FOUND; + } + + /** + * Listener that does nothing. + */ + static final SSTableReadsListener NOOP_LISTENER = new SSTableReadsListener() {}; + + /** + * Handles notification that the specified SSTable has been skipped during a single partition query. + * + * @param sstable the SSTable reader + * @param reason the reason for which the SSTable has been skipped + */ + default void onSSTableSkipped(SSTableReader sstable, SkippingReason reason) + { + } + + /** + * Handles notification that the specified SSTable has been selected during a single partition query. + * + * @param sstable the SSTable reader + * @param indexEntry the index entry + * @param reason the reason for which the SSTable has been selected + */ + default void onSSTableSelected(SSTableReader sstable, RowIndexEntry<?> indexEntry, SelectionReason reason) + { + } + + /** + * Handles notification that the specified SSTable is being scanned during a partition range query. + * + * @param sstable the SSTable reader of the SSTable being scanned. + */ + default void onScanningStarted(SSTableReader sstable) + { + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index c29bc5d,8551819..1b6a299 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@@ -37,11 -32,19 +37,14 @@@ import org.apache.cassandra.dht.Range import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.io.sstable.format.SSTableReadsListener; -import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason; + import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason; ++import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; /** * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. @@@ -56,29 -59,29 +59,29 @@@ public class BigTableReader extends SST super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header); } - public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed) - public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, SSTableReadsListener listener) ++ public UnfilteredRowIterator iterator(DecoratedKey key, ++ Slices slices, ++ ColumnFilter selectedColumns, ++ boolean reversed, ++ SSTableReadsListener listener) { - RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ); + RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ, listener); - return iterator(null, key, rie, slices, selectedColumns, reversed, isForThrift); + return iterator(null, key, rie, slices, selectedColumns, reversed); } - public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) + public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed) { if (indexEntry == null) - return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed); + return UnfilteredRowIterators.noRowsIterator(metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed); return reversed - ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile) - : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile); + ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, ifile) + : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, ifile); } - /** - * @param columns the columns to return. - * @param dataRange filter to use when reading the columns - * @return A Scanner for seeking over the rows of the SSTable. - */ - public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange) + @Override - public ISSTableScanner getScanner(ColumnFilter columns, - DataRange dataRange, - RateLimiter limiter, - boolean isForThrift, - SSTableReadsListener listener) ++ public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener) { - return BigTableScanner.getScanner(this, columns, dataRange); - return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift, listener); ++ return BigTableScanner.getScanner(this, columns, dataRange, listener); } /** @@@ -124,14 -127,12 +127,18 @@@ return SSTableIdentityIterator.create(this, dfile, position, key, tombstoneOnly); } - @Override + /** + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. + * @param updateCacheAndStats true if updating stats and cache + * @return The index entry corresponding to the key, or null if the key is not present + */ - protected RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast) + protected RowIndexEntry getPosition(PartitionPosition key, + Operator op, + boolean updateCacheAndStats, + boolean permitMatchPastLast, + SSTableReadsListener listener) { if (op == Operator.EQ) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 716ef4c,f4bd1ea..b01573c --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@@ -60,38 -62,65 +61,46 @@@ public class BigTableScanner implement private final ColumnFilter columns; private final DataRange dataRange; private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; - private final boolean isForThrift; + private final SSTableReadsListener listener; private long startScan = -1; private long bytesScanned = 0; protected Iterator<UnfilteredRowIterator> iterator; // Full scan of the sstables - public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) + public static ISSTableScanner getScanner(SSTableReader sstable) { - return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, Iterators.singletonIterator(fullRange(sstable))); - return new BigTableScanner(sstable, - ColumnFilter.all(sstable.metadata), - limiter, - Iterators.singletonIterator(fullRange(sstable))); ++ return getScanner(sstable, Iterators.singletonIterator(fullRange(sstable))); } - public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange) + public static ISSTableScanner getScanner(SSTableReader sstable, + ColumnFilter columns, + DataRange dataRange, - RateLimiter limiter, - boolean isForThrift, + SSTableReadsListener listener) { - return new BigTableScanner(sstable, columns, dataRange, makeBounds(sstable, dataRange).iterator()); - return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator(), listener); ++ return new BigTableScanner(sstable, columns, dataRange, makeBounds(sstable, dataRange).iterator(), listener); } - public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) + public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges) { // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(tokenRanges); if (positions.isEmpty()) return new EmptySSTableScanner(sstable); - return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, makeBounds(sstable, tokenRanges).iterator()); - return new BigTableScanner(sstable, - ColumnFilter.all(sstable.metadata), - limiter, - makeBounds(sstable, tokenRanges).iterator()); ++ return getScanner(sstable, makeBounds(sstable, tokenRanges).iterator()); } public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator) { - return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, rangeIterator); - return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, rangeIterator); - } - - private BigTableScanner(SSTableReader sstable, - ColumnFilter columns, - RateLimiter limiter, - Iterator<AbstractBounds<PartitionPosition>> rangeIterator) - { - this(sstable, columns, null, limiter, false, rangeIterator, SSTableReadsListener.NOOP_LISTENER); ++ return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, rangeIterator, SSTableReadsListener.NOOP_LISTENER); } - private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, Iterator<AbstractBounds<PartitionPosition>> rangeIterator) + private BigTableScanner(SSTableReader sstable, + ColumnFilter columns, + DataRange dataRange, - RateLimiter limiter, - boolean isForThrift, + Iterator<AbstractBounds<PartitionPosition>> rangeIterator, + SSTableReadsListener listener) { assert sstable != null; @@@ -100,10 -129,12 +109,11 @@@ this.sstable = sstable; this.columns = columns; this.dataRange = dataRange; - this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata(), sstable.descriptor.version, sstable.header); - this.isForThrift = isForThrift; this.rangeIterator = rangeIterator; + this.listener = listener; } private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges) @@@ -264,6 -299,7 +274,7 @@@ private Iterator<UnfilteredRowIterator> createIterator() { - listener.onScanningStarted(sstable); ++ this.listener.onScanningStarted(sstable); return new KeyScanningIterator(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index 0a970e1,9fafc74..a2352fc --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@@ -243,7 -243,10 +244,9 @@@ public class TTLExpiryTes cfs.enableAutoCompaction(true); assertEquals(1, cfs.getLiveSSTables().size()); SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(cfs.metadata()), DataRange.allData(cfs.getPartitioner())); - ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata), ++ ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(cfs.metadata()), + DataRange.allData(cfs.getPartitioner()), - false, + SSTableReadsListener.NOOP_LISTENER); assertTrue(scanner.hasNext()); while(scanner.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java index bc82128,f7ced23..581109c --- a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java @@@ -208,7 -211,12 +209,11 @@@ public class SSTableCorruptionDetection for (int i = 0; i < numberOfPks; i++) { DecoratedKey dk = Util.dk(String.format("pkvalue_%07d", i)); - try (UnfilteredRowIterator rowIter = sstable.iterator(dk, Slices.ALL, ColumnFilter.all(cfs.metadata()), false)) + try (UnfilteredRowIterator rowIter = sstable.iterator(dk, + Slices.ALL, - ColumnFilter.all(cfs.metadata), - false, ++ ColumnFilter.all(cfs.metadata()), + false, + SSTableReadsListener.NOOP_LISTENER)) { while (rowIter.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index 353b1ad,d1db09a..eff95fc --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@@ -179,9 -181,9 +180,11 @@@ public class SSTableScannerTes private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int ... boundaries) { assert boundaries.length % 2 == 0; - for (DataRange range : dataRanges(sstable.metadata, scanStart, scanEnd)) + for (DataRange range : dataRanges(sstable.metadata(), scanStart, scanEnd)) { - try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata()), range)) - try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata), range, false, SSTableReadsListener.NOOP_LISTENER)) ++ try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata()), ++ range, ++ SSTableReadsListener.NOOP_LISTENER)) { for (int b = 0; b < boundaries.length; b += 2) for (int i = boundaries[b]; i <= boundaries[b + 1]; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java index fd93ca1,391927c..5d62cdb --- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java @@@ -223,7 -224,12 +224,11 @@@ public class SSTableWriterTest extends try { DecoratedKey dk = Util.dk("large_value"); - UnfilteredRowIterator rowIter = sstable.iterator(dk, Slices.ALL, ColumnFilter.all(cfs.metadata()), false); + UnfilteredRowIterator rowIter = sstable.iterator(dk, + Slices.ALL, - ColumnFilter.all(cfs.metadata), - false, ++ ColumnFilter.all(cfs.metadata()), + false, + SSTableReadsListener.NOOP_LISTENER); while (rowIter.hasNext()) { rowIter.next(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org