Merge branch 'cassandra-2.1' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ba900dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ba900dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ba900dc

Branch: refs/heads/trunk
Commit: 9ba900dcf6541dc3d4ec94024dacf8c84ccf8e4d
Parents: 54956e9 9f7ab09
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 13 09:19:52 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 13 09:19:52 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 26 -----------
 .../cassandra/io/sstable/SSTableLoader.java     |  2 +-
 .../cassandra/streaming/StreamSession.java      | 40 ++++++++++++----
 .../cassandra/streaming/StreamTransferTask.java | 12 ++---
 .../streaming/messages/OutgoingFileMessage.java | 48 ++++++++++++++------
 .../apache/cassandra/utils/concurrent/Refs.java |  2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |  2 +-
 .../streaming/StreamTransferTaskTest.java       |  2 +-
 .../streaming/StreamingTransferTest.java        |  2 +-
 10 files changed, 78 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 09ee3e4,a316d12..6a70692
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -24,8 -24,10 +24,11 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  
+ import javax.annotation.Nullable;
+ 
+ import com.google.common.base.Function;
  import com.google.common.collect.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -311,10 -301,31 +315,32 @@@ public class StreamSession implements I
          {
              for (ColumnFamilyStore cfStore : stores)
              {
-                 List<AbstractBounds<RowPosition>> rowBoundsList = new 
ArrayList<>(ranges.size());
+                 final List<AbstractBounds<RowPosition>> rowBoundsList = new 
ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
 -                    rowBoundsList.add(range.toRowBounds());
 +                    rowBoundsList.add(Range.makeRowRange(range));
-                 
refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, 
!isIncremental)).refs);
+                 refs.addAll(cfStore.selectAndReference(new 
Function<DataTracker.View, List<SSTableReader>>()
+                 {
+                     public List<SSTableReader> apply(DataTracker.View view)
+                     {
+                         List<SSTableReader> filteredSSTables = 
ColumnFamilyStore.CANONICAL_SSTABLES.apply(view);
+                         Set<SSTableReader> sstables = Sets.newHashSet();
+                         if (filteredSSTables != null)
+                         {
+                             for (AbstractBounds<RowPosition> rowBounds : 
rowBoundsList)
+                             {
+                                 // sstableInBounds may contain early opened 
sstables
+                                 for (SSTableReader sstable : 
view.sstablesInBounds(rowBounds))
+                                 {
+                                     if (filteredSSTables.contains(sstable) && 
(!isIncremental || !sstable.isRepaired()))
+                                         sstables.add(sstable);
+                                 }
+                             }
+                         }
+ 
++                        logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), view.sstables.size());
+                         return ImmutableList.copyOf(sstables);
+                     }
+                 }).refs);
              }
  
              List<SSTableStreamingSections> sections = new 
ArrayList<>(refs.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 9f9b765,1727bae..f14abd2
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -52,10 -52,10 +52,10 @@@ public class StreamTransferTask extend
          super(session, cfId);
      }
  
-     public synchronized void addTransferFile(SSTableReader sstable, Ref ref, 
long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+     public synchronized void addTransferFile(Ref<SSTableReader> ref, long 
estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
      {
-         assert sstable != null && cfId.equals(sstable.metadata.cfId);
-         OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, 
session.keepSSTableLevel());
+         assert ref.get() != null && cfId.equals(ref.get().metadata.cfId);
 -        OutgoingFileMessage message = new OutgoingFileMessage(ref, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
++        OutgoingFileMessage message = new OutgoingFileMessage(ref, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, 
session.keepSSTableLevel());
          files.put(message.header.sequenceNumber, message);
          totalSize += message.header.size();
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index bfa02fa,082e306..5b34bd8
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -43,31 -44,25 +43,25 @@@ public class OutgoingFileMessage extend
              throw new UnsupportedOperationException("Not allowed to call 
deserialize on an outgoing file");
          }
  
 -        public void serialize(OutgoingFileMessage message, 
DataOutputStreamAndChannel out, int version, StreamSession session) throws 
IOException
 +        public void serialize(OutgoingFileMessage message, 
DataOutputStreamPlus out, int version, StreamSession session) throws IOException
          {
-             FileMessageHeader.serializer.serialize(message.header, out, 
version);
- 
-             final SSTableReader reader = message.sstable;
-             StreamWriter writer = message.header.compressionInfo == null ?
-                     new StreamWriter(reader, message.header.sections, 
session) :
-                     new CompressedStreamWriter(reader,
-                             message.header.sections,
-                             message.header.compressionInfo, session);
-             writer.write(out);
+             message.serialize(out, version, session);
              session.fileSent(message.header);
          }
      };
  
      public final FileMessageHeader header;
-     public final SSTableReader sstable;
-     public final Ref<SSTableReader> ref;
+     private final Ref<SSTableReader> ref;
+     private final String filename;
+     private boolean completed = false;
  
-     public OutgoingFileMessage(SSTableReader sstable, Ref ref, int 
sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long 
repairedAt, boolean keepSSTableLevel)
 -    public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, 
long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
++    public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, 
long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean 
keepSSTableLevel)
      {
          super(Type.FILE);
-         this.sstable = sstable;
          this.ref = ref;
  
+         SSTableReader sstable = ref.get();
+         filename = sstable.getFilename();
          CompressionInfo compressionInfo = null;
          if (sstable.compression)
          {
@@@ -81,10 -75,35 +75,36 @@@
                                              estimatedKeys,
                                              sections,
                                              compressionInfo,
 -                                            repairedAt);
 +                                            repairedAt,
 +                                            keepSSTableLevel ? 
sstable.getSSTableLevel() : 0);
      }
  
 -    public synchronized void serialize(DataOutputStreamAndChannel out, int 
version, StreamSession session) throws IOException
++    public synchronized void serialize(DataOutputStreamPlus out, int version, 
StreamSession session) throws IOException
+     {
+         if (completed)
+         {
+             return;
+         }
+ 
+         FileMessageHeader.serializer.serialize(header, out, version);
+ 
+         final SSTableReader reader = ref.get();
+         StreamWriter writer = header.compressionInfo == null ?
+                                       new StreamWriter(reader, 
header.sections, session) :
+                                       new CompressedStreamWriter(reader, 
header.sections,
+                                                                  
header.compressionInfo, session);
 -        writer.write(out.getChannel());
++        writer.write(out);
+     }
+ 
+     public synchronized void complete()
+     {
+         if (!completed)
+         {
+             completed = true;
+             ref.release();
+         }
+     }
+ 
      @Override
      public String toString()
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ba900dc/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------

Reply via email to