Merge branch cassandra-3.11 into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/96899bbb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/96899bbb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/96899bbb

Branch: refs/heads/trunk
Commit: 96899bbb60ebba05408c3248e756ed33605e8075
Parents: a741efd 5c9db9a
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Thu Jun 1 10:20:29 2017 +0200
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Thu Jun 1 10:22:54 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  2 +
 doc/source/operating/metrics.rst                |  2 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 20 ++++-
 .../db/SinglePartitionReadCommand.java          | 83 ++++++++++++++------
 .../org/apache/cassandra/db/StorageHook.java    | 53 ++++++++-----
 .../UnfilteredRowIteratorWithLowerBound.java    | 10 ++-
 .../io/sstable/format/SSTableReader.java        | 62 +++++++++------
 .../io/sstable/format/SSTableReadsListener.java | 81 +++++++++++++++++++
 .../io/sstable/format/big/BigTableReader.java   | 33 +++++---
 .../io/sstable/format/big/BigTableScanner.java  | 23 ++++--
 .../miscellaneous/SSTablesIteratedTest.java     | 69 +++++++++++++++-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  5 +-
 .../sstable/SSTableCorruptionDetectionTest.java |  7 +-
 .../io/sstable/SSTableScannerTest.java          |  5 +-
 .../cassandra/io/sstable/SSTableWriterTest.java |  7 +-
 16 files changed, 369 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/NEWS.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/doc/source/operating/metrics.rst
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index a47302b,aa8271d..bc80907
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@@ -206,16 -209,17 +207,17 @@@ public class PartitionRangeReadCommand 
              for (Memtable memtable : view.memtables)
              {
                  @SuppressWarnings("resource") // We close on exception and on 
closing the result returned by this method
 -                Memtable.MemtableUnfilteredPartitionIterator iter = 
memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
 +                Memtable.MemtableUnfilteredPartitionIterator iter = 
memtable.makePartitionIterator(columnFilter(), dataRange());
                  oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
 -                iterators.add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
 +                iterators.add(iter);
              }
  
+             SSTableReadsListener readCountUpdater = newReadCountUpdater();
              for (SSTableReader sstable : view.sstables)
              {
                  @SuppressWarnings("resource") // We close on exception and on 
closing the result returned by this method
-                 UnfilteredPartitionIterator iter = 
sstable.getScanner(columnFilter(), dataRange());
 -                UnfilteredPartitionIterator iter = 
sstable.getScanner(columnFilter(), dataRange(), isForThrift(), 
readCountUpdater);
 -                iterators.add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
++                UnfilteredPartitionIterator iter = 
sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
 +                iterators.add(iter);
                  if (!sstable.isRepaired())
                      oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index d5d2901,47c426e..5ebfdcc
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -567,7 -600,7 +569,7 @@@ public class SinglePartitionReadComman
  
                  @SuppressWarnings("resource") // 'iter' is added to iterators 
which is closed on exception,
                                                // or through the closing of 
the final merged iterator
-                 UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, 
sstable);
 -                UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, 
sstable, true, metricsCollector);
++                UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, 
sstable, metricsCollector);
                  if (!sstable.isRepaired())
                      oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
  
@@@ -587,7 -620,7 +589,7 @@@
  
                      @SuppressWarnings("resource") // 'iter' is added to 
iterators which is close on exception,
                                                    // or through the closing 
of the final merged iterator
-                     UnfilteredRowIteratorWithLowerBound iter = 
makeIterator(cfs, sstable);
 -                    UnfilteredRowIteratorWithLowerBound iter = 
makeIterator(cfs, sstable, false, metricsCollector);
++                    UnfilteredRowIteratorWithLowerBound iter = 
makeIterator(cfs, sstable, metricsCollector);
                      if (!sstable.isRepaired())
                          oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
  
@@@ -600,10 -633,10 +602,10 @@@
                                 nonIntersectingSSTables, view.sstables.size(), 
includedDueToTombstones);
  
              if (iterators.isEmpty())
 -                return EmptyIterators.unfilteredRow(cfs.metadata, 
partitionKey(), filter.isReversed());
 +                return EmptyIterators.unfilteredRow(cfs.metadata(), 
partitionKey(), filter.isReversed());
  
 -            StorageHook.instance.reportRead(cfs.metadata.cfId, 
partitionKey());
 +            StorageHook.instance.reportRead(cfs.metadata().id, 
partitionKey());
-             return withSSTablesIterated(iterators, cfs.metric);
+             return withSSTablesIterated(iterators, cfs.metric, 
metricsCollector);
          }
          catch (RuntimeException | Error e)
          {
@@@ -630,13 -663,20 +632,16 @@@
          return clusteringIndexFilter().shouldInclude(sstable);
      }
  
-     private UnfilteredRowIteratorWithLowerBound 
makeIterator(ColumnFamilyStore cfs, final SSTableReader sstable)
+     private UnfilteredRowIteratorWithLowerBound 
makeIterator(ColumnFamilyStore cfs,
 -                                                             final 
SSTableReader sstable,
 -                                                             boolean 
applyThriftTransformation,
++                                                             SSTableReader 
sstable,
+                                                              
SSTableReadsListener listener)
      {
          return StorageHook.instance.makeRowIteratorWithLowerBound(cfs,
                                                                    
partitionKey(),
                                                                    sstable,
                                                                    
clusteringIndexFilter(),
-                                                                   
columnFilter());
+                                                                   
columnFilter(),
 -                                                                  
isForThrift(),
 -                                                                  nowInSec(),
 -                                                                  
applyThriftTransformation,
+                                                                   listener);
  
      }
  
@@@ -743,10 -780,15 +745,14 @@@
                      continue; // no tombstone at all, we can skip that sstable
  
                  // We need to get the partition deletion and include it if 
it's live. In any case though, we're done with that sstable.
-                 sstable.incrementReadCount();
-                 try (UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), 
filter.getSlices(metadata()), columnFilter(), filter.isReversed()))
+                 try (UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs,
+                                                                               
         sstable,
+                                                                               
         partitionKey(),
+                                                                               
         filter.getSlices(metadata()),
+                                                                               
         columnFilter(),
+                                                                               
         filter.isReversed(),
 -                                                                              
         isForThrift(),
+                                                                               
         metricsCollector))
                  {
-                     sstablesIterated++;
                      if (!iter.partitionLevelDeletion().isLive())
                          result = 
add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), 
Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), 
result, filter, sstable.isRepaired());
                      else
@@@ -755,17 -797,21 +761,20 @@@
                  continue;
              }
  
-             Tracing.trace("Merging data from sstable {}", 
sstable.descriptor.generation);
-             sstable.incrementReadCount();
-             try (UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), 
filter.getSlices(metadata()), columnFilter(), filter.isReversed()))
+             try (UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs,
+                                                                               
     sstable,
+                                                                               
     partitionKey(),
+                                                                               
     filter.getSlices(metadata()),
+                                                                               
     columnFilter(),
+                                                                               
     filter.isReversed(),
 -                                                                              
     isForThrift(),
+                                                                               
     metricsCollector))
              {
                  if (iter.isEmpty())
                      continue;
  
                  if (sstable.isRepaired())
                      onlyUnrepaired = false;
-                 sstablesIterated++;
 -                result = add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, 
sstable.isRepaired());
 +                result = add(iter, result, filter, sstable.isRepaired());
              }
          }
  
@@@ -776,10 -822,10 +785,10 @@@
  
          DecoratedKey key = result.partitionKey();
          
cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), 
key.hashCode(), 1);
 -        StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
 +        StorageHook.instance.reportRead(cfs.metadata.id, partitionKey());
  
          // "hoist up" the requested data into a more recent sstable
-         if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+         if (metricsCollector.getMergedSSTables() > 
cfs.getMinimumCompactionThreshold()
              && onlyUnrepaired
              && !cfs.isAutoCompactionDisabled()
              && cfs.getCompactionStrategyManager().shouldDefragment())
@@@ -1041,12 -1088,40 +1050,40 @@@
  
      private static class Deserializer extends SelectionDeserializer
      {
 -        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int 
nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, 
Optional<IndexMetadata> index)
 +        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> 
index)
          throws IOException
          {
 -            DecoratedKey key = 
metadata.decorateKey(metadata.getKeyValidator().readValue(in, 
DatabaseDescriptor.getMaxValueSize()));
 +            DecoratedKey key = 
metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, 
DatabaseDescriptor.getMaxValueSize()));
              ClusteringIndexFilter filter = 
ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
 -            return new SinglePartitionReadCommand(isDigest, digestVersion, 
isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
 +            return new SinglePartitionReadCommand(isDigest, digestVersion, 
metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
          }
      }
+ 
+     /**
+      * {@code SSTableReaderListener} used to collect metrics about SSTable 
read access.
+      */
+     private static final class SSTableReadMetricsCollector implements 
SSTableReadsListener
+     {
+         /**
+          * The number of SSTables that need to be merged. This counter is 
only updated for single partition queries
+          * since this has been the behavior so far.
+          */
+         private int mergedSSTables;
+ 
+         @Override
+         public void onSSTableSelected(SSTableReader sstable, RowIndexEntry<?> 
indexEntry, SelectionReason reason)
+         {
+             sstable.incrementReadCount();
+             mergedSSTables++;
+         }
+ 
+         /**
+          * Returns the number of SSTables that need to be merged.
+          * @return the number of SSTables that need to be merged.
+          */
+         public int getMergedSSTables()
+         {
+             return mergedSSTables;
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/StorageHook.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/StorageHook.java
index 3df8805,48d7ede..be1d0bf
--- a/src/java/org/apache/cassandra/db/StorageHook.java
+++ b/src/java/org/apache/cassandra/db/StorageHook.java
@@@ -24,7 -26,7 +24,8 @@@ import org.apache.cassandra.db.partitio
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.utils.FBUtilities;
  
  public interface StorageHook
@@@ -37,13 -39,20 +38,15 @@@
                                                                        
DecoratedKey partitionKey,
                                                                        
SSTableReader sstable,
                                                                        
ClusteringIndexFilter filter,
-                                                                       
ColumnFilter selectedColumns);
+                                                                       
ColumnFilter selectedColumns,
 -                                                                      boolean 
isForThrift,
 -                                                                      int 
nowInSec,
 -                                                                      boolean 
applyThriftTransformation,
+                                                                       
SSTableReadsListener listener);
 -
      public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs,
                                                   SSTableReader sstable,
                                                   DecoratedKey key,
                                                   Slices slices,
                                                   ColumnFilter selectedColumns,
-                                                  boolean reversed);
+                                                  boolean reversed,
 -                                                 boolean isForThrift,
+                                                  SSTableReadsListener 
listener);
  
      static StorageHook createHook()
      {
@@@ -52,27 -61,44 +55,37 @@@
          {
              return FBUtilities.construct(className, 
StorageHook.class.getSimpleName());
          }
-         else
+ 
+         return new StorageHook()
          {
-             return new StorageHook()
-             {
-                 public void reportWrite(TableId tableId, PartitionUpdate 
partitionUpdate) {}
 -            public void reportWrite(UUID cfid, PartitionUpdate 
partitionUpdate) {}
++            public void reportWrite(TableId tableId, PartitionUpdate 
partitionUpdate) {}
  
-                 public void reportRead(TableId tableId, DecoratedKey key) {}
 -            public void reportRead(UUID cfid, DecoratedKey key) {}
++            public void reportRead(TableId tableId, DecoratedKey key) {}
  
-                 public UnfilteredRowIteratorWithLowerBound 
makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, DecoratedKey partitionKey, 
SSTableReader sstable, ClusteringIndexFilter filter, ColumnFilter 
selectedColumns)
-                 {
-                     return new 
UnfilteredRowIteratorWithLowerBound(partitionKey,
-                                                                    sstable,
-                                                                    filter,
-                                                                    
selectedColumns);
-                 }
+             public UnfilteredRowIteratorWithLowerBound 
makeRowIteratorWithLowerBound(ColumnFamilyStore cfs,
+                                                                               
       DecoratedKey partitionKey,
+                                                                               
       SSTableReader sstable,
+                                                                               
       ClusteringIndexFilter filter,
+                                                                               
       ColumnFilter selectedColumns,
 -                                                                              
       boolean isForThrift,
 -                                                                              
       int nowInSec,
 -                                                                              
       boolean applyThriftTransformation,
+                                                                               
       SSTableReadsListener listener)
+             {
+                 return new UnfilteredRowIteratorWithLowerBound(partitionKey,
+                                                                sstable,
+                                                                filter,
+                                                                
selectedColumns,
 -                                                               isForThrift,
 -                                                               nowInSec,
 -                                                               
applyThriftTransformation,
+                                                                listener);
+             }
  
-                 public UnfilteredRowIterator 
makeRowIterator(ColumnFamilyStore cfs, SSTableReader sstable, DecoratedKey key, 
Slices slices, ColumnFilter selectedColumns, boolean reversed)
-                 {
-                     return sstable.iterator(key, slices, selectedColumns, 
reversed);
-                 }
-             };
-         }
+             public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore 
cfs,
+                                                          SSTableReader 
sstable,
+                                                          DecoratedKey key,
+                                                          Slices slices,
+                                                          ColumnFilter 
selectedColumns,
+                                                          boolean reversed,
 -                                                         boolean isForThrift,
+                                                          SSTableReadsListener 
listener)
+             {
 -                return sstable.iterator(key, slices, selectedColumns, 
reversed, isForThrift, listener);
++                return sstable.iterator(key, slices, selectedColumns, 
reversed, listener);
+             }
+         };
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index d23c37c,84a742b..71c3d7f
--- 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@@ -31,7 -31,9 +31,8 @@@ import org.apache.cassandra.db.filter.C
  import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.io.sstable.IndexInfo;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 -import org.apache.cassandra.thrift.ThriftResultsMerger;
  import org.apache.cassandra.utils.IteratorWithLowerBound;
  
  /**
@@@ -47,18 -49,30 +48,21 @@@ public class UnfilteredRowIteratorWithL
      private final SSTableReader sstable;
      private final ClusteringIndexFilter filter;
      private final ColumnFilter selectedColumns;
 -    private final boolean isForThrift;
 -    private final int nowInSec;
 -    private final boolean applyThriftTransformation;
+     private final SSTableReadsListener listener;
      private ClusteringBound lowerBound;
      private boolean firstItemRetrieved;
  
      public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey,
                                                 SSTableReader sstable,
                                                 ClusteringIndexFilter filter,
-                                                ColumnFilter selectedColumns)
+                                                ColumnFilter selectedColumns,
 -                                               boolean isForThrift,
 -                                               int nowInSec,
 -                                               boolean 
applyThriftTransformation,
+                                                SSTableReadsListener listener)
      {
          super(partitionKey);
          this.sstable = sstable;
          this.filter = filter;
          this.selectedColumns = selectedColumns;
 -        this.isForThrift = isForThrift;
 -        this.nowInSec = nowInSec;
 -        this.applyThriftTransformation = applyThriftTransformation;
+         this.listener = listener;
          this.lowerBound = null;
          this.firstItemRetrieved = false;
      }
@@@ -89,11 -103,11 +93,9 @@@
      @Override
      protected UnfilteredRowIterator initializeIterator()
      {
-         sstable.incrementReadCount();
- 
          @SuppressWarnings("resource") // 'iter' is added to iterators which 
is closed on exception, or through the closing of the final merged iterator
-         UnfilteredRowIterator iter = sstable.iterator(partitionKey(), 
filter.getSlices(metadata()), selectedColumns, filter.isReversed());
 -        UnfilteredRowIterator iter = sstable.iterator(partitionKey(), 
filter.getSlices(metadata()), selectedColumns, filter.isReversed(), 
isForThrift, listener);
 -        return isForThrift && applyThriftTransformation
 -               ? ThriftResultsMerger.maybeWrap(iter, nowInSec)
 -               : iter;
++        UnfilteredRowIterator iter = sstable.iterator(partitionKey(), 
filter.getSlices(metadata()), selectedColumns, filter.isReversed(), listener);
 +        return iter;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 4495edf,e9b2491..25cfddb
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1512,35 -1523,55 +1512,62 @@@ public abstract class SSTableReader ext
          return null;
      }
  
 +    private boolean keyCacheEnabled()
 +    {
 +        return keyCache != null && keyCache.getCapacity() > 0 && 
metadata().params.caching.cacheKeys();
 +    }
 +
      /**
-      * Get position updating key cache and stats.
-      * @see #getPosition(PartitionPosition, SSTableReader.Operator, boolean)
+      * Retrieves the position while updating the key cache and the stats.
+      * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
+      * allow key selection by token bounds but only if op != * EQ
+      * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
       */
-     public RowIndexEntry getPosition(PartitionPosition key, Operator op)
+     public final RowIndexEntry getPosition(PartitionPosition key, Operator op)
      {
-         return getPosition(key, op, true, false);
+         return getPosition(key, op, SSTableReadsListener.NOOP_LISTENER);
      }
  
-     public RowIndexEntry getPosition(PartitionPosition key, Operator op, 
boolean updateCacheAndStats)
+     /**
+      * Retrieves the position while updating the key cache and the stats.
+      * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
+      * allow key selection by token bounds but only if op != * EQ
+      * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
+      * @param listener the {@code SSTableReaderListener} that must handle the 
notifications.
+      */
+     public final RowIndexEntry getPosition(PartitionPosition key, Operator 
op, SSTableReadsListener listener)
      {
-         return getPosition(key, op, updateCacheAndStats, false);
+         return getPosition(key, op, true, false, listener);
      }
+ 
 -    public final RowIndexEntry getPosition(PartitionPosition key, Operator 
op, boolean updateCacheAndStats)
++    public final RowIndexEntry getPosition(PartitionPosition key,
++                                           Operator op,
++                                           boolean updateCacheAndStats)
+     {
+         return getPosition(key, op, updateCacheAndStats, false, 
SSTableReadsListener.NOOP_LISTENER);
+     }
++
      /**
       * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
       * allow key selection by token bounds but only if op != * EQ
       * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
       * @param updateCacheAndStats true if updating stats and cache
+      * @param listener a listener used to handle internal events
       * @return The index entry corresponding to the key, or null if the key 
is not present
       */
-     protected abstract RowIndexEntry getPosition(PartitionPosition key, 
Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
+     protected abstract RowIndexEntry getPosition(PartitionPosition key,
+                                                  Operator op,
+                                                  boolean updateCacheAndStats,
+                                                  boolean permitMatchPastLast,
+                                                  SSTableReadsListener 
listener);
+ 
+     public abstract UnfilteredRowIterator iterator(DecoratedKey key,
+                                                    Slices slices,
+                                                    ColumnFilter 
selectedColumns,
+                                                    boolean reversed,
 -                                                   boolean isForThrift,
+                                                    SSTableReadsListener 
listener);
  
-     public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices 
slices, ColumnFilter selectedColumns, boolean reversed);
 -    public abstract UnfilteredRowIterator iterator(FileDataInput file, 
DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter 
selectedColumns, boolean reversed, boolean isForThrift);
 +    public abstract UnfilteredRowIterator iterator(FileDataInput file, 
DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter 
selectedColumns, boolean reversed);
  
      public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, 
DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly);
  
@@@ -1698,9 -1753,14 +1725,10 @@@
      /**
       * @param columns the columns to return.
       * @param dataRange filter to use when reading the columns
+      * @param listener a listener used to handle internal read events
       * @return A Scanner for seeking over the rows of the SSTable.
       */
-     public abstract ISSTableScanner getScanner(ColumnFilter columns, 
DataRange dataRange);
 -    public abstract ISSTableScanner getScanner(ColumnFilter columns,
 -                                               DataRange dataRange,
 -                                               RateLimiter limiter,
 -                                               boolean isForThrift,
 -                                               SSTableReadsListener listener);
++    public abstract ISSTableScanner getScanner(ColumnFilter columns, 
DataRange dataRange, SSTableReadsListener listener);
  
      public FileDataInput getFileDataInput(long position)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
index 0000000,8f6e3c0..6d384bf
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReadsListener.java
@@@ -1,0 -1,82 +1,81 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.io.sstable.format;
+ 
+ import org.apache.cassandra.db.RowIndexEntry;
 -import 
org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
+ 
+ /**
+  * Listener for receiving notifications associated with reading SSTables.
+  */
+ public interface SSTableReadsListener
+ {
+     /**
+      * The reasons for skipping an SSTable
+      */
+     enum SkippingReason
+     {
+         BLOOM_FILTER,
+         MIN_MAX_KEYS,
+         PARTITION_INDEX_LOOKUP,
+         INDEX_ENTRY_NOT_FOUND;
+     }
+ 
+     /**
+      * The reasons for selecting an SSTable
+      */
+     enum SelectionReason
+     {
+         KEY_CACHE_HIT,
+         INDEX_ENTRY_FOUND;
+     }
+ 
+     /**
+      * Listener that does nothing.
+      */
+     static final SSTableReadsListener NOOP_LISTENER = new 
SSTableReadsListener() {};
+ 
+     /**
+      * Handles notification that the specified SSTable has been skipped 
during a single partition query.
+      *
+      * @param sstable the SSTable reader
+      * @param reason the reason for which the SSTable has been skipped
+      */
+     default void onSSTableSkipped(SSTableReader sstable, SkippingReason 
reason)
+     {
+     }
+ 
+     /**
+      * Handles notification that the specified SSTable has been selected 
during a single partition query.
+      *
+      * @param sstable the SSTable reader
+      * @param indexEntry the index entry
+      * @param reason the reason for which the SSTable has been selected
+      */
+     default void onSSTableSelected(SSTableReader sstable, RowIndexEntry<?> 
indexEntry, SelectionReason reason)
+     {
+     }
+ 
+     /**
+      * Handles notification that the specified SSTable is being scanned 
during a partition range query.
+      *
+      * @param sstable the SSTable reader of the SSTable being scanned.
+      */
+     default void onScanningStarted(SSTableReader sstable)
+     {
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index c29bc5d,8551819..1b6a299
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -37,11 -32,19 +37,14 @@@ import org.apache.cassandra.dht.Range
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.sstable.*;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 -import 
org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason;
+ import 
org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
++import 
org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
  import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.schema.TableMetadataRef;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 -
 -import java.io.IOException;
 -import java.nio.ByteBuffer;
 -import java.util.*;
  
  /**
   * SSTableReaders are open()ed by Keyspace.onStart; after that they are 
created by SSTableWriter.renameAndOpen.
@@@ -56,29 -59,29 +59,29 @@@ public class BigTableReader extends SST
          super(desc, components, metadata, maxDataAge, sstableMetadata, 
openReason, header);
      }
  
-     public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, 
ColumnFilter selectedColumns, boolean reversed)
 -    public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, 
ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, 
SSTableReadsListener listener)
++    public UnfilteredRowIterator iterator(DecoratedKey key,
++                                          Slices slices,
++                                          ColumnFilter selectedColumns,
++                                          boolean reversed,
++                                          SSTableReadsListener listener)
      {
-         RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ);
+         RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ, 
listener);
 -        return iterator(null, key, rie, slices, selectedColumns, reversed, 
isForThrift);
 +        return iterator(null, key, rie, slices, selectedColumns, reversed);
      }
  
 -    public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey 
key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, 
boolean reversed, boolean isForThrift)
 +    public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey 
key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, 
boolean reversed)
      {
          if (indexEntry == null)
 -            return UnfilteredRowIterators.noRowsIterator(metadata, key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
 +            return UnfilteredRowIterators.noRowsIterator(metadata(), key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
          return reversed
 -             ? new SSTableReversedIterator(this, file, key, indexEntry, 
slices, selectedColumns, isForThrift, ifile)
 -             : new SSTableIterator(this, file, key, indexEntry, slices, 
selectedColumns, isForThrift, ifile);
 +             ? new SSTableReversedIterator(this, file, key, indexEntry, 
slices, selectedColumns, ifile)
 +             : new SSTableIterator(this, file, key, indexEntry, slices, 
selectedColumns, ifile);
      }
  
-     /**
-      * @param columns the columns to return.
-      * @param dataRange filter to use when reading the columns
-      * @return A Scanner for seeking over the rows of the SSTable.
-      */
-     public ISSTableScanner getScanner(ColumnFilter columns, DataRange 
dataRange)
+     @Override
 -    public ISSTableScanner getScanner(ColumnFilter columns,
 -                                      DataRange dataRange,
 -                                      RateLimiter limiter,
 -                                      boolean isForThrift,
 -                                      SSTableReadsListener listener)
++    public ISSTableScanner getScanner(ColumnFilter columns, DataRange 
dataRange, SSTableReadsListener listener)
      {
-         return BigTableScanner.getScanner(this, columns, dataRange);
 -        return BigTableScanner.getScanner(this, columns, dataRange, limiter, 
isForThrift, listener);
++        return BigTableScanner.getScanner(this, columns, dataRange, listener);
      }
  
      /**
@@@ -124,14 -127,12 +127,18 @@@
          return SSTableIdentityIterator.create(this, dfile, position, key, 
tombstoneOnly);
      }
  
 -    @Override
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key 
is not present
 +     */
-     protected RowIndexEntry getPosition(PartitionPosition key, Operator op, 
boolean updateCacheAndStats, boolean permitMatchPastLast)
+     protected RowIndexEntry getPosition(PartitionPosition key,
+                                         Operator op,
+                                         boolean updateCacheAndStats,
+                                         boolean permitMatchPastLast,
+                                         SSTableReadsListener listener)
      {
          if (op == Operator.EQ)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 716ef4c,f4bd1ea..b01573c
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -60,38 -62,65 +61,46 @@@ public class BigTableScanner implement
      private final ColumnFilter columns;
      private final DataRange dataRange;
      private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 -    private final boolean isForThrift;
+     private final SSTableReadsListener listener;
      private long startScan = -1;
      private long bytesScanned = 0;
  
      protected Iterator<UnfilteredRowIterator> iterator;
  
      // Full scan of the sstables
 -    public static ISSTableScanner getScanner(SSTableReader sstable, 
RateLimiter limiter)
 +    public static ISSTableScanner getScanner(SSTableReader sstable)
      {
-         return new BigTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, 
Iterators.singletonIterator(fullRange(sstable)));
 -        return new BigTableScanner(sstable,
 -                                   ColumnFilter.all(sstable.metadata),
 -                                   limiter,
 -                                   
Iterators.singletonIterator(fullRange(sstable)));
++        return getScanner(sstable, 
Iterators.singletonIterator(fullRange(sstable)));
      }
  
-     public static ISSTableScanner getScanner(SSTableReader sstable, 
ColumnFilter columns, DataRange dataRange)
+     public static ISSTableScanner getScanner(SSTableReader sstable,
+                                              ColumnFilter columns,
+                                              DataRange dataRange,
 -                                             RateLimiter limiter,
 -                                             boolean isForThrift,
+                                              SSTableReadsListener listener)
      {
-         return new BigTableScanner(sstable, columns, dataRange, 
makeBounds(sstable, dataRange).iterator());
 -        return new BigTableScanner(sstable, columns, dataRange, limiter, 
isForThrift, makeBounds(sstable, dataRange).iterator(), listener);
++        return new BigTableScanner(sstable, columns, dataRange, 
makeBounds(sstable, dataRange).iterator(), listener);
      }
  
 -    public static ISSTableScanner getScanner(SSTableReader sstable, 
Collection<Range<Token>> tokenRanges, RateLimiter limiter)
 +    public static ISSTableScanner getScanner(SSTableReader sstable, 
Collection<Range<Token>> tokenRanges)
      {
          // We want to avoid allocating a SSTableScanner if the range don't 
overlap the sstable (#5249)
          List<Pair<Long, Long>> positions = 
sstable.getPositionsForRanges(tokenRanges);
          if (positions.isEmpty())
              return new EmptySSTableScanner(sstable);
  
-         return new BigTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, makeBounds(sstable, 
tokenRanges).iterator());
 -        return new BigTableScanner(sstable,
 -                                   ColumnFilter.all(sstable.metadata),
 -                                   limiter,
 -                                   makeBounds(sstable, 
tokenRanges).iterator());
++        return getScanner(sstable, makeBounds(sstable, 
tokenRanges).iterator());
      }
  
      public static ISSTableScanner getScanner(SSTableReader sstable, 
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
      {
-         return new BigTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, rangeIterator);
 -        return new BigTableScanner(sstable, 
ColumnFilter.all(sstable.metadata), null, rangeIterator);
 -    }
 -
 -    private BigTableScanner(SSTableReader sstable,
 -                            ColumnFilter columns,
 -                            RateLimiter limiter,
 -                            Iterator<AbstractBounds<PartitionPosition>> 
rangeIterator)
 -    {
 -        this(sstable, columns, null, limiter, false, rangeIterator, 
SSTableReadsListener.NOOP_LISTENER);
++        return new BigTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, rangeIterator, 
SSTableReadsListener.NOOP_LISTENER);
      }
  
-     private BigTableScanner(SSTableReader sstable, ColumnFilter columns, 
DataRange dataRange, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
+     private BigTableScanner(SSTableReader sstable,
+                             ColumnFilter columns,
+                             DataRange dataRange,
 -                            RateLimiter limiter,
 -                            boolean isForThrift,
+                             Iterator<AbstractBounds<PartitionPosition>> 
rangeIterator,
+                             SSTableReadsListener listener)
      {
          assert sstable != null;
  
@@@ -100,10 -129,12 +109,11 @@@
          this.sstable = sstable;
          this.columns = columns;
          this.dataRange = dataRange;
 -        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
 +        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata(),
                                                                                
                          sstable.descriptor.version,
                                                                                
                          sstable.header);
 -        this.isForThrift = isForThrift;
          this.rangeIterator = rangeIterator;
+         this.listener = listener;
      }
  
      private static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
@@@ -264,6 -299,7 +274,7 @@@
  
      private Iterator<UnfilteredRowIterator> createIterator()
      {
 -        listener.onScanningStarted(sstable);
++        this.listener.onScanningStarted(sstable);
          return new KeyScanningIterator();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index 0a970e1,9fafc74..a2352fc
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@@ -243,7 -243,10 +244,9 @@@ public class TTLExpiryTes
          cfs.enableAutoCompaction(true);
          assertEquals(1, cfs.getLiveSSTables().size());
          SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
-         ISSTableScanner scanner = 
sstable.getScanner(ColumnFilter.all(cfs.metadata()), 
DataRange.allData(cfs.getPartitioner()));
 -        ISSTableScanner scanner = 
sstable.getScanner(ColumnFilter.all(sstable.metadata),
++        ISSTableScanner scanner = 
sstable.getScanner(ColumnFilter.all(cfs.metadata()),
+                                                      
DataRange.allData(cfs.getPartitioner()),
 -                                                     false,
+                                                      
SSTableReadsListener.NOOP_LISTENER);
          assertTrue(scanner.hasNext());
          while(scanner.hasNext())
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index bc82128,f7ced23..581109c
--- 
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@@ -208,7 -211,12 +209,11 @@@ public class SSTableCorruptionDetection
              for (int i = 0; i < numberOfPks; i++)
              {
                  DecoratedKey dk = Util.dk(String.format("pkvalue_%07d", i));
-                 try (UnfilteredRowIterator rowIter = sstable.iterator(dk, 
Slices.ALL, ColumnFilter.all(cfs.metadata()), false))
+                 try (UnfilteredRowIterator rowIter = sstable.iterator(dk,
+                                                                       
Slices.ALL,
 -                                                                      
ColumnFilter.all(cfs.metadata),
 -                                                                      false,
++                                                                      
ColumnFilter.all(cfs.metadata()),
+                                                                       false,
+                                                                       
SSTableReadsListener.NOOP_LISTENER))
                  {
                      while (rowIter.hasNext())
                      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index 353b1ad,d1db09a..eff95fc
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@@ -179,9 -181,9 +180,11 @@@ public class SSTableScannerTes
      private static void assertScanMatches(SSTableReader sstable, int 
scanStart, int scanEnd, int ... boundaries)
      {
          assert boundaries.length % 2 == 0;
 -        for (DataRange range : dataRanges(sstable.metadata, scanStart, 
scanEnd))
 +        for (DataRange range : dataRanges(sstable.metadata(), scanStart, 
scanEnd))
          {
-             try(ISSTableScanner scanner = 
sstable.getScanner(ColumnFilter.all(sstable.metadata()), range))
 -            try(ISSTableScanner scanner = 
sstable.getScanner(ColumnFilter.all(sstable.metadata), range, false, 
SSTableReadsListener.NOOP_LISTENER))
++            try(ISSTableScanner scanner = 
sstable.getScanner(ColumnFilter.all(sstable.metadata()),
++                                                             range,
++                                                             
SSTableReadsListener.NOOP_LISTENER))
              {
                  for (int b = 0; b < boundaries.length; b += 2)
                      for (int i = boundaries[b]; i <= boundaries[b + 1]; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96899bbb/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
index fd93ca1,391927c..5d62cdb
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@@ -223,7 -224,12 +224,11 @@@ public class SSTableWriterTest extends 
              try
              {
                  DecoratedKey dk = Util.dk("large_value");
-                 UnfilteredRowIterator rowIter = sstable.iterator(dk, 
Slices.ALL, ColumnFilter.all(cfs.metadata()), false);
+                 UnfilteredRowIterator rowIter = sstable.iterator(dk,
+                                                                  Slices.ALL,
 -                                                                 
ColumnFilter.all(cfs.metadata),
 -                                                                 false,
++                                                                 
ColumnFilter.all(cfs.metadata()),
+                                                                  false,
+                                                                  
SSTableReadsListener.NOOP_LISTENER);
                  while (rowIter.hasNext())
                  {
                      rowIter.next();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to