Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ba900dc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ba900dc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ba900dc Branch: refs/heads/trunk Commit: 9ba900dcf6541dc3d4ec94024dacf8c84ccf8e4d Parents: 54956e9 9f7ab09 Author: Yuki Morishita <yu...@apache.org> Authored: Wed May 13 09:19:52 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed May 13 09:19:52 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 26 ----------- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../cassandra/streaming/StreamSession.java | 40 ++++++++++++---- .../cassandra/streaming/StreamTransferTask.java | 12 ++--- .../streaming/messages/OutgoingFileMessage.java | 48 ++++++++++++++------ .../apache/cassandra/utils/concurrent/Refs.java | 2 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../streaming/StreamTransferTaskTest.java | 2 +- .../streaming/StreamingTransferTest.java | 2 +- 10 files changed, 78 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 09ee3e4,a316d12..6a70692 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -24,8 -24,10 +24,11 @@@ import java.util.* import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; + import javax.annotation.Nullable; + + import com.google.common.base.Function; import com.google.common.collect.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -311,10 -301,31 +315,32 @@@ public class StreamSession implements I { for (ColumnFamilyStore cfStore : stores) { - List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); + final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) - rowBoundsList.add(range.toRowBounds()); + rowBoundsList.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs); + refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>() + { + public List<SSTableReader> apply(DataTracker.View view) + { + List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view); + Set<SSTableReader> sstables = Sets.newHashSet(); + if (filteredSSTables != null) + { + for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) + { + // sstableInBounds may contain early opened sstables + for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) + { + if (filteredSSTables.contains(sstable) && (!isIncremental || !sstable.isRepaired())) + sstables.add(sstable); + } + } + } + ++ logger.debug("ViewFilter for {}/{} sstables", sstables.size(), view.sstables.size()); + return ImmutableList.copyOf(sstables); + } + }).refs); } List<SSTableStreamingSections> sections = new ArrayList<>(refs.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 9f9b765,1727bae..f14abd2 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@@ -52,10 -52,10 +52,10 @@@ public class StreamTransferTask extend super(session, cfId); } - public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) + public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) { - assert sstable != null && cfId.equals(sstable.metadata.cfId); - OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); + assert ref.get() != null && cfId.equals(ref.get().metadata.cfId); - OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt); ++ OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index bfa02fa,082e306..5b34bd8 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@@ -43,31 -44,25 +43,25 @@@ public class OutgoingFileMessage extend throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file"); } - public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { - FileMessageHeader.serializer.serialize(message.header, out, version); - - final SSTableReader reader = message.sstable; - StreamWriter writer = message.header.compressionInfo == null ? - new StreamWriter(reader, message.header.sections, session) : - new CompressedStreamWriter(reader, - message.header.sections, - message.header.compressionInfo, session); - writer.write(out); + message.serialize(out, version, session); session.fileSent(message.header); } }; public final FileMessageHeader header; - public final SSTableReader sstable; - public final Ref<SSTableReader> ref; + private final Ref<SSTableReader> ref; + private final String filename; + private boolean completed = false; - public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel) - public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) ++ public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel) { super(Type.FILE); - this.sstable = sstable; this.ref = ref; + SSTableReader sstable = ref.get(); + filename = sstable.getFilename(); CompressionInfo compressionInfo = null; if (sstable.compression) { @@@ -81,10 -75,35 +75,36 @@@ estimatedKeys, sections, compressionInfo, - repairedAt); + repairedAt, + keepSSTableLevel ? sstable.getSSTableLevel() : 0); } - public synchronized void serialize(DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException ++ public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException + { + if (completed) + { + return; + } + + FileMessageHeader.serializer.serialize(header, out, version); + + final SSTableReader reader = ref.get(); + StreamWriter writer = header.compressionInfo == null ? + new StreamWriter(reader, header.sections, session) : + new CompressedStreamWriter(reader, header.sections, + header.compressionInfo, session); - writer.write(out.getChannel()); ++ writer.write(out); + } + + public synchronized void complete() + { + if (!completed) + { + completed = true; + ref.release(); + } + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ----------------------------------------------------------------------