Updated Branches:
  refs/heads/trunk 40b6c5d9c -> 515116972

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java 
b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
deleted file mode 100644
index 6db5b15..0000000
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-import org.junit.Test;
-
-import org.apache.cassandra.AbstractSerializationsTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.BytesToken;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class SerializationsTest extends AbstractSerializationsTester
-{
-    private void testPendingFileWrite() throws IOException
-    {
-        // make sure to test serializing null and a pf with no sstable.
-        PendingFile normal = makePendingFile(true, 100, 
OperationType.BOOTSTRAP);
-        PendingFile noSections = makePendingFile(true, 0, OperationType.AES);
-        PendingFile noSST = makePendingFile(false, 100, 
OperationType.RESTORE_REPLICA_COUNT);
-
-        DataOutputStream out = getOutput("streaming.PendingFile.bin");
-        PendingFile.serializer.serialize(normal, out, getVersion());
-        PendingFile.serializer.serialize(noSections, out, getVersion());
-        PendingFile.serializer.serialize(noSST, out, getVersion());
-        PendingFile.serializer.serialize(null, out, getVersion());
-        out.close();
-
-        // test serializedSize
-        testSerializedSize(normal, PendingFile.serializer);
-        testSerializedSize(noSections, PendingFile.serializer);
-        testSerializedSize(noSST, PendingFile.serializer);
-        testSerializedSize(null, PendingFile.serializer);
-    }
-
-    @Test
-    public void testPendingFileRead() throws IOException
-    {
-        if (EXECUTE_WRITES)
-            testPendingFileWrite();
-
-        DataInputStream in = getInput("streaming.PendingFile.bin");
-        assert PendingFile.serializer.deserialize(in, getVersion()) != null;
-        assert PendingFile.serializer.deserialize(in, getVersion()) != null;
-        assert PendingFile.serializer.deserialize(in, getVersion()) != null;
-        assert PendingFile.serializer.deserialize(in, getVersion()) == null;
-        in.close();
-    }
-
-    private void testStreamHeaderWrite() throws IOException
-    {
-        UUID sessionId = UUIDGen.getTimeUUID();
-        StreamHeader sh0 = new StreamHeader("Keyspace1", sessionId, 
makePendingFile(true, 100, OperationType.BOOTSTRAP));
-        StreamHeader sh1 = new StreamHeader("Keyspace1", sessionId, 
makePendingFile(false, 100, OperationType.BOOTSTRAP));
-        Collection<PendingFile> files = new ArrayList<PendingFile>();
-        for (int i = 0; i < 50; i++)
-            files.add(makePendingFile(i % 2 == 0, 100, 
OperationType.BOOTSTRAP));
-        StreamHeader sh2 = new StreamHeader("Keyspace1", sessionId, 
makePendingFile(true, 100, OperationType.BOOTSTRAP), files);
-        StreamHeader sh3 = new StreamHeader("Keyspace1", sessionId, null, 
files);
-        StreamHeader sh4 = new StreamHeader("Keyspace1", sessionId, 
makePendingFile(true, 100, OperationType.BOOTSTRAP), new 
ArrayList<PendingFile>());
-
-        DataOutputStream out = getOutput("streaming.StreamHeader.bin");
-        StreamHeader.serializer.serialize(sh0, out, getVersion());
-        StreamHeader.serializer.serialize(sh1, out, getVersion());
-        StreamHeader.serializer.serialize(sh2, out, getVersion());
-        StreamHeader.serializer.serialize(sh3, out, getVersion());
-        StreamHeader.serializer.serialize(sh4, out, getVersion());
-        out.close();
-
-        // test serializedSize
-        testSerializedSize(sh0, StreamHeader.serializer);
-        testSerializedSize(sh1, StreamHeader.serializer);
-        testSerializedSize(sh2, StreamHeader.serializer);
-        testSerializedSize(sh3, StreamHeader.serializer);
-        testSerializedSize(sh4, StreamHeader.serializer);
-    }
-
-    @Test
-    public void testStreamHeaderRead() throws IOException
-    {
-        if (EXECUTE_WRITES)
-            testStreamHeaderWrite();
-
-        DataInputStream in = getInput("streaming.StreamHeader.bin");
-        assert StreamHeader.serializer.deserialize(in, getVersion()) != null;
-        assert StreamHeader.serializer.deserialize(in, getVersion()) != null;
-        assert StreamHeader.serializer.deserialize(in, getVersion()) != null;
-        assert StreamHeader.serializer.deserialize(in, getVersion()) != null;
-        assert StreamHeader.serializer.deserialize(in, getVersion()) != null;
-        in.close();
-    }
-
-    private void testStreamReplyWrite() throws IOException
-    {
-        UUID sessionId = UUIDGen.getTimeUUID();
-        StreamReply rep = new StreamReply("this is a file", sessionId, 
StreamReply.Status.FILE_FINISHED);
-        DataOutputStream out = getOutput("streaming.StreamReply.bin");
-        StreamReply.serializer.serialize(rep, out, getVersion());
-        rep.createMessage().serialize(out, getVersion());
-        out.close();
-
-        // test serializedSize
-        testSerializedSize(rep, StreamReply.serializer);
-    }
-
-    @Test
-    public void testStreamReplyRead() throws IOException
-    {
-        if (EXECUTE_WRITES)
-            testStreamReplyWrite();
-
-        DataInputStream in = getInput("streaming.StreamReply.bin");
-        assert StreamReply.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        in.close();
-    }
-
-    private static PendingFile makePendingFile(boolean sst, int numSecs, 
OperationType op)
-    {
-        Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), 
"Keyspace1", "Standard1", 23, false);
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
-        for (int i = 0; i < numSecs; i++)
-            sections.add(Pair.create(new Long(i), new Long(i * i)));
-        return new PendingFile(sst ? makeSSTable() : null, desc, 
SSTable.COMPONENT_DATA, sections, op);
-    }
-
-    private void testStreamRequestMessageWrite() throws IOException
-    {
-        UUID sessionId = UUIDGen.getTimeUUID();
-        Collection<Range<Token>> ranges = new ArrayList<Range<Token>>();
-        for (int i = 0; i < 5; i++)
-            ranges.add(new Range<Token>(new 
BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new 
BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
-        List<ColumnFamilyStore> stores = 
Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
-        StreamRequest msg0 = new 
StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, 
sessionId, OperationType.RESTORE_REPLICA_COUNT);
-        StreamRequest msg1 = new 
StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, 
OperationType.BOOTSTRAP), sessionId);
-        StreamRequest msg2 = new 
StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, 
OperationType.BOOTSTRAP), sessionId);
-
-        DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
-        StreamRequest.serializer.serialize(msg0, out, getVersion());
-        StreamRequest.serializer.serialize(msg1, out, getVersion());
-        StreamRequest.serializer.serialize(msg2, out, getVersion());
-        msg0.createMessage().serialize(out, getVersion());
-        msg1.createMessage().serialize(out, getVersion());
-        msg2.createMessage().serialize(out, getVersion());
-        out.close();
-
-        // test serializedSize
-        testSerializedSize(msg0, StreamRequest.serializer);
-        testSerializedSize(msg1, StreamRequest.serializer);
-        testSerializedSize(msg2, StreamRequest.serializer);
-    }
-
-    @Test
-    public void testStreamRequestMessageRead() throws IOException
-    {
-        if (EXECUTE_WRITES)
-            testStreamRequestMessageWrite();
-
-        DataInputStream in = getInput("streaming.StreamRequestMessage.bin");
-        assert StreamRequest.serializer.deserialize(in, getVersion()) != null;
-        assert StreamRequest.serializer.deserialize(in, getVersion()) != null;
-        assert StreamRequest.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        in.close();
-    }
-
-    private static SSTableReader makeSSTable()
-    {
-        Table t = Table.open("Keyspace1");
-        for (int i = 0; i < 100; i++)
-        {
-            RowMutation rm = new RowMutation(t.getName(), 
ByteBufferUtil.bytes(Long.toString(System.nanoTime())));
-            rm.add("Standard1", ByteBufferUtil.bytes("cola"), 
ByteBufferUtil.bytes("value"), 0);
-            rm.apply();
-        }
-        try
-        {
-            t.getColumnFamilyStore("Standard1").forceBlockingFlush();
-            return 
t.getColumnFamilyStore("Standard1").getSSTables().iterator().next();
-        }
-        catch (Exception any)
-        {
-            throw new RuntimeException(any);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java 
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
new file mode 100644
index 0000000..60fbf40
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class SessionInfoTest
+{
+    /**
+     * Test if total numbers are collect
+     */
+    @Test
+    public void testTotals()
+    {
+        UUID cfId = UUID.randomUUID();
+        InetAddress local = FBUtilities.getLocalAddress();
+
+        Collection<StreamSummary> summaries = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            StreamSummary summary = new StreamSummary(cfId, i, (i + 1) * 10);
+            summaries.add(summary);
+        }
+
+        StreamSummary sending = new StreamSummary(cfId, 10, 100);
+        SessionInfo info = new SessionInfo(local, summaries, 
Collections.singleton(sending), StreamSession.State.PREPARING);
+
+        assert info.getTotalFilesToReceive() == 45;
+        assert info.getTotalFilesToSend() == 10;
+        assert info.getTotalSizeToReceive() == 550;
+        assert info.getTotalSizeToSend() == 100;
+        // still, no files received or sent
+        assert info.getTotalFilesReceived() == 0;
+        assert info.getTotalFilesSent() == 0;
+
+        // receive in progress
+        info.updateProgress(new ProgressInfo(local, "test.txt", 
ProgressInfo.Direction.IN, 50, 100));
+        // still in progress, but not completed yet
+        assert info.getTotalSizeReceived() == 50;
+        assert info.getTotalSizeSent() == 0;
+        assert info.getTotalFilesReceived() == 0;
+        assert info.getTotalFilesSent() == 0;
+        info.updateProgress(new ProgressInfo(local, "test.txt", 
ProgressInfo.Direction.IN, 100, 100));
+        // 1 file should be completed
+        assert info.getTotalSizeReceived() == 100;
+        assert info.getTotalSizeSent() == 0;
+        assert info.getTotalFilesReceived() == 1;
+        assert info.getTotalFilesSent() == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/StreamUtil.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamUtil.java 
b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
deleted file mode 100644
index 4987923..0000000
--- a/test/unit/org/apache/cassandra/streaming/StreamUtil.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.streaming;
-
-import java.io.IOError;
-import java.io.IOException;
-import java.net.InetAddress;
-
-import org.apache.cassandra.net.MessageIn;
-
-public class StreamUtil
-{
-    /**
-     * Takes an stream request message and creates an empty status response. 
Exists here because StreamRequestMessage
-     * is package protected.
-     */
-    static public void finishStreamRequest(MessageIn<StreamRequest> msg, 
InetAddress to)
-    {
-        StreamInSession session = StreamInSession.get(to, 
msg.payload.sessionId);
-        try
-        {
-            session.closeIfFinished();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index a86330c..3e9eeb1 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -1,35 +1,40 @@
-package org.apache.cassandra.streaming;
-
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-import static junit.framework.Assert.assertEquals;
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.Util;
-import static org.apache.cassandra.Util.column;
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
-import java.sql.Date;
 import java.nio.ByteBuffer;
+import java.sql.Date;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
@@ -39,21 +44,18 @@ import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
 
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+import static org.apache.cassandra.Util.column;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class StreamingTransferTest extends SchemaLoader
@@ -69,10 +71,64 @@ public class StreamingTransferTest extends SchemaLoader
     }
 
     /**
+     * Test if empty {@link StreamPlan} returns success with empty result.
+     */
+    @Test
+    public void testEmptyStreamPlan() throws Exception
+    {
+        StreamResultFuture futureResult = new 
StreamPlan("StreamingTransferTest").execute();
+        final UUID planId = futureResult.planId;
+        Futures.addCallback(futureResult, new FutureCallback<StreamState>()
+        {
+            public void onSuccess(StreamState result)
+            {
+                assert planId.equals(result.planId);
+                assert result.description.equals("StreamingTransferTest");
+                assert result.sessions.isEmpty();
+            }
+
+            public void onFailure(Throwable t)
+            {
+                fail();
+            }
+        });
+        // should be complete immediately
+        futureResult.get(100, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testRequestEmpty() throws Exception
+    {
+        // requesting empty data should succeed
+        IPartitioner p = StorageService.getPartitioner();
+        List<Range<Token>> ranges = new ArrayList<>();
+        ranges.add(new Range<>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("key1"))));
+        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
+
+        StreamResultFuture futureResult = new 
StreamPlan("StreamingTransferTest")
+                                                  .requestRanges(LOCAL, 
"Keyspace2", ranges)
+                                                  .execute();
+
+        UUID planId = futureResult.planId;
+        StreamState result = futureResult.get();
+        assert planId.equals(result.planId);
+        assert result.description.equals("StreamingTransferTest");
+
+        // we should have completed session with empty transfer
+        assert result.sessions.size() == 1;
+        SessionInfo session = Iterables.get(result.sessions, 0);
+        assert session.peer.equals(LOCAL);
+        assert session.getTotalFilesReceived() == 0;
+        assert session.getTotalFilesSent() == 0;
+        assert session.getTotalSizeReceived() == 0;
+        assert session.getTotalSizeSent() == 0;
+    }
+
+    /**
      * Create and transfer a single sstable, and return the keys that should 
have been transferred.
      * The Mutator must create the given column, but it may also create any 
other columns it pleases.
      */
-    private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, 
Mutator mutator) throws Exception
+    private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator 
mutator) throws Exception
     {
         // write a temporary SSTable, and unregister it
         logger.debug("Mutating " + cfs.name);
@@ -87,7 +143,7 @@ public class StreamingTransferTest extends SchemaLoader
 
         // transfer the first and last key
         logger.debug("Transferring " + cfs.name);
-        transfer(table, sstable);
+        transfer(sstable);
 
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
@@ -108,7 +164,7 @@ public class StreamingTransferTest extends SchemaLoader
         // and that the max timestamp for the file was rediscovered
         assertEquals(timestamp, 
cfs.getSSTables().iterator().next().getMaxTimestamp());
 
-        List<String> keys = new ArrayList<String>();
+        List<String> keys = new ArrayList<>();
         for (int off : offs)
             keys.add("key" + off);
 
@@ -116,20 +172,18 @@ public class StreamingTransferTest extends SchemaLoader
         return keys;
     }
 
-    private void transfer(Table table, SSTableReader sstable) throws Exception
+    private void transfer(SSTableReader sstable) throws Exception
     {
         IPartitioner p = StorageService.getPartitioner();
-        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
-        ranges.add(new Range<Token>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("key1"))));
-        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
-        transfer(table, sstable, ranges);
+        List<Range<Token>> ranges = new ArrayList<>();
+        ranges.add(new Range<>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("key1"))));
+        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
+        transfer(sstable, ranges);
     }
 
-    private void transfer(Table table, SSTableReader sstable, 
List<Range<Token>> ranges) throws Exception
+    private void transfer(SSTableReader sstable, List<Range<Token>> ranges) 
throws Exception
     {
-        StreamOutSession session = StreamOutSession.create(table.getName(), 
LOCAL, (IStreamCallback)null);
-        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, 
OperationType.BOOTSTRAP);
-        session.await();
+        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, ranges, 
Collections.singleton(sstable)).execute().get();
     }
 
     /**
@@ -157,7 +211,7 @@ public class StreamingTransferTest extends SchemaLoader
 
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         cfs.clearUnsafe();
-        transfer(table, sstable);
+        transfer(sstable);
 
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
@@ -172,7 +226,7 @@ public class StreamingTransferTest extends SchemaLoader
         final Table table = Table.open("Keyspace1");
         final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
 
-        List<String> keys = createAndTransfer(table, cfs, new Mutator()
+        List<String> keys = createAndTransfer(cfs, new Mutator()
         {
             public void mutate(String key, String col, long timestamp) throws 
Exception
             {
@@ -190,7 +244,6 @@ public class StreamingTransferTest extends SchemaLoader
         for (String key : keys)
         {
             long val = key.hashCode();
-            IPartitioner p = StorageService.getPartitioner();
             IndexExpression expr = new 
IndexExpression(ByteBufferUtil.bytes("birthdate"),
                                                        IndexOperator.EQ,
                                                        
ByteBufferUtil.bytes(val));
@@ -210,14 +263,14 @@ public class StreamingTransferTest extends SchemaLoader
         final ColumnFamilyStore cfs = table.getColumnFamilyStore("Counter1");
         final CounterContext cc = new CounterContext();
 
-        final Map<String, ColumnFamily> cleanedEntries = new HashMap<String, 
ColumnFamily>();
+        final Map<String, ColumnFamily> cleanedEntries = new HashMap<>();
 
-        List<String> keys = createAndTransfer(table, cfs, new Mutator()
+        List<String> keys = createAndTransfer(cfs, new Mutator()
         {
             /** Creates a new SSTable per key: all will be merged before 
streaming. */
             public void mutate(String key, String col, long timestamp) throws 
Exception
             {
-                Map<String, ColumnFamily> entries = new HashMap<String, 
ColumnFamily>();
+                Map<String, ColumnFamily> entries = new HashMap<>();
                 ColumnFamily cf = 
TreeMapBackedSortedColumns.factory.create(cfs.metadata);
                 ColumnFamily cfCleaned = 
TreeMapBackedSortedColumns.factory.create(cfs.metadata);
                 CounterContext.ContextState state = 
CounterContext.ContextState.allocate(4, 1);
@@ -254,7 +307,7 @@ public class StreamingTransferTest extends SchemaLoader
 
         // Retransfer the file, making sure it is now idempotent (see 
CASSANDRA-3481)
         cfs.clearUnsafe();
-        transfer(table, streamed);
+        transfer(streamed);
         SSTableReader restreamed = cfs.getSSTables().iterator().next();
         SSTableUtils.assertContentEquals(streamed, restreamed);
     }
@@ -263,7 +316,7 @@ public class StreamingTransferTest extends SchemaLoader
     public void testTransferTableMultiple() throws Exception
     {
         // write temporary SSTables, but don't register them
-        Set<String> content = new HashSet<String>();
+        Set<String> content = new HashSet<>();
         content.add("test");
         content.add("test2");
         content.add("test3");
@@ -271,7 +324,7 @@ public class StreamingTransferTest extends SchemaLoader
         String tablename = sstable.getTableName();
         String cfname = sstable.getColumnFamilyName();
 
-        content = new HashSet<String>();
+        content = new HashSet<>();
         content.add("transfer1");
         content.add("transfer2");
         content.add("transfer3");
@@ -279,15 +332,13 @@ public class StreamingTransferTest extends SchemaLoader
 
         // transfer the first and last key
         IPartitioner p = StorageService.getPartitioner();
-        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
-        ranges.add(new Range<Token>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("test"))));
-        ranges.add(new 
Range<Token>(p.getToken(ByteBufferUtil.bytes("transfer2")), 
p.getMinimumToken()));
+        List<Range<Token>> ranges = new ArrayList<>();
+        ranges.add(new Range<>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("test"))));
+        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("transfer2")), 
p.getMinimumToken()));
         // Acquiring references, transferSSTables needs it
         sstable.acquireReference();
         sstable2.acquireReference();
-        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, 
(IStreamCallback) null);
-        StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), 
ranges, OperationType.BOOTSTRAP);
-        session.await();
+        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, ranges, 
Arrays.asList(sstable, sstable2)).execute().get();
 
         // confirm that the sstables were transferred and registered and that 
2 keys arrived
         ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);
@@ -311,12 +362,12 @@ public class StreamingTransferTest extends SchemaLoader
         String keyspace = "KeyCacheSpace";
         IPartitioner p = StorageService.getPartitioner();
         String[] columnFamilies = new String[] { "Standard1", "Standard2", 
"Standard3" };
-        List<SSTableReader> ssTableReaders = new ArrayList<SSTableReader>();
+        List<SSTableReader> ssTableReaders = new ArrayList<>();
 
-        NavigableMap<DecoratedKey,String> keys = new 
TreeMap<DecoratedKey,String>();
+        NavigableMap<DecoratedKey,String> keys = new TreeMap<>();
         for (String cf : columnFamilies)
         {
-            Set<String> content = new HashSet<String>();
+            Set<String> content = new HashSet<>();
             content.add("data-" + cf + "-1");
             content.add("data-" + cf + "-2");
             content.add("data-" + cf + "-3");
@@ -332,19 +383,16 @@ public class StreamingTransferTest extends SchemaLoader
         Map.Entry<DecoratedKey,String> first = keys.firstEntry();
         Map.Entry<DecoratedKey,String> last = keys.lastEntry();
         Map.Entry<DecoratedKey,String> secondtolast = 
keys.lowerEntry(last.getKey());
-        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
-        ranges.add(new Range<Token>(p.getMinimumToken(), 
first.getKey().token));
+        List<Range<Token>> ranges = new ArrayList<>();
+        ranges.add(new Range<>(p.getMinimumToken(), first.getKey().token));
         // the left hand side of the range is exclusive, so we transfer from 
the second-to-last token
-        ranges.add(new Range<Token>(secondtolast.getKey().token, 
p.getMinimumToken()));
+        ranges.add(new Range<>(secondtolast.getKey().token, 
p.getMinimumToken()));
 
         // Acquiring references, transferSSTables needs it
         if (!SSTableReader.acquireReferences(ssTableReaders))
             throw new AssertionError();
 
-        StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, 
(IStreamCallback)null);
-        StreamOut.transferSSTables(session, ssTableReaders, ranges, 
OperationType.BOOTSTRAP);
-
-        session.await();
+        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, ranges, 
ssTableReaders).execute().get();
 
         // check that only two keys were transferred
         for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last))
@@ -382,11 +430,11 @@ public class StreamingTransferTest extends SchemaLoader
         cfs.clearUnsafe();
 
         IPartitioner p = StorageService.getPartitioner();
-        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
-        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key1000"))));
-        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key5")), 
p.getToken(ByteBufferUtil.bytes("key500"))));
-        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key9")), 
p.getToken(ByteBufferUtil.bytes("key900"))));
-        transfer(table, sstable, ranges);
+        List<Range<Token>> ranges = new ArrayList<>();
+        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key1000"))));
+        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key5")), 
p.getToken(ByteBufferUtil.bytes("key500"))));
+        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key9")), 
p.getToken(ByteBufferUtil.bytes("key900"))));
+        transfer(sstable, ranges);
         assertEquals(1, cfs.getSSTables().size());
         assertEquals(7, Util.getRangeSlice(cfs).size());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 95297b1..ab311e6 100644
--- 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -26,7 +26,10 @@ import java.util.*;
 import org.junit.Test;
 
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMetadata;

Reply via email to