Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4aa56ec2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4aa56ec2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4aa56ec2 Branch: refs/heads/cassandra-2.2 Commit: 4aa56ec2454931f2d34eaef51699c9b37ea8efcd Parents: 1411ad5 98ac45a Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Jul 2 18:05:26 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Jul 2 18:05:26 2015 +0100 ---------------------------------------------------------------------- .../cassandra/streaming/StreamSession.java | 21 +++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4aa56ec2/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 44522db,1edfedb..7236194 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -318,23 -303,26 +318,26 @@@ public class StreamSession implements I { final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) - rowBoundsList.add(range.toRowBounds()); - refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>() + rowBoundsList.add(Range.makeRowRange(range)); + refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() { - public List<SSTableReader> apply(DataTracker.View view) + public List<SSTableReader> apply(View view) { - List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view); + Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>(); + for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)) + permittedInstances.put(reader, reader); + Set<SSTableReader> sstables = Sets.newHashSet(); - if (filteredSSTables != null) + for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) { - for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) + // sstableInBounds may contain early opened sstables + for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) { - // sstableInBounds may contain early opened sstables - for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) - { - if (filteredSSTables.contains(sstable) && (!isIncremental || !sstable.isRepaired())) - sstables.add(sstable); - } + if (isIncremental && sstable.isRepaired()) + continue; + sstable = permittedInstances.get(sstable); + if (sstable != null) + sstables.add(sstable); } }