Updated Branches: refs/heads/trunk 40b6c5d9c -> 515116972
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java deleted file mode 100644 index 6db5b15..0000000 --- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.streaming; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.*; - -import org.junit.Test; - -import org.apache.cassandra.AbstractSerializationsTester; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.db.Table; -import org.apache.cassandra.dht.BytesToken; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDGen; - -public class SerializationsTest extends AbstractSerializationsTester -{ - private void testPendingFileWrite() throws IOException - { - // make sure to test serializing null and a pf with no sstable. - PendingFile normal = makePendingFile(true, 100, OperationType.BOOTSTRAP); - PendingFile noSections = makePendingFile(true, 0, OperationType.AES); - PendingFile noSST = makePendingFile(false, 100, OperationType.RESTORE_REPLICA_COUNT); - - DataOutputStream out = getOutput("streaming.PendingFile.bin"); - PendingFile.serializer.serialize(normal, out, getVersion()); - PendingFile.serializer.serialize(noSections, out, getVersion()); - PendingFile.serializer.serialize(noSST, out, getVersion()); - PendingFile.serializer.serialize(null, out, getVersion()); - out.close(); - - // test serializedSize - testSerializedSize(normal, PendingFile.serializer); - testSerializedSize(noSections, PendingFile.serializer); - testSerializedSize(noSST, PendingFile.serializer); - testSerializedSize(null, PendingFile.serializer); - } - - @Test - public void testPendingFileRead() throws IOException - { - if (EXECUTE_WRITES) - testPendingFileWrite(); - - DataInputStream in = getInput("streaming.PendingFile.bin"); - assert PendingFile.serializer.deserialize(in, getVersion()) != null; - assert PendingFile.serializer.deserialize(in, getVersion()) != null; - assert PendingFile.serializer.deserialize(in, getVersion()) != null; - assert PendingFile.serializer.deserialize(in, getVersion()) == null; - in.close(); - } - - private void testStreamHeaderWrite() throws IOException - { - UUID sessionId = UUIDGen.getTimeUUID(); - StreamHeader sh0 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP)); - StreamHeader sh1 = new StreamHeader("Keyspace1", sessionId, makePendingFile(false, 100, OperationType.BOOTSTRAP)); - Collection<PendingFile> files = new ArrayList<PendingFile>(); - for (int i = 0; i < 50; i++) - files.add(makePendingFile(i % 2 == 0, 100, OperationType.BOOTSTRAP)); - StreamHeader sh2 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), files); - StreamHeader sh3 = new StreamHeader("Keyspace1", sessionId, null, files); - StreamHeader sh4 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>()); - - DataOutputStream out = getOutput("streaming.StreamHeader.bin"); - StreamHeader.serializer.serialize(sh0, out, getVersion()); - StreamHeader.serializer.serialize(sh1, out, getVersion()); - StreamHeader.serializer.serialize(sh2, out, getVersion()); - StreamHeader.serializer.serialize(sh3, out, getVersion()); - StreamHeader.serializer.serialize(sh4, out, getVersion()); - out.close(); - - // test serializedSize - testSerializedSize(sh0, StreamHeader.serializer); - testSerializedSize(sh1, StreamHeader.serializer); - testSerializedSize(sh2, StreamHeader.serializer); - testSerializedSize(sh3, StreamHeader.serializer); - testSerializedSize(sh4, StreamHeader.serializer); - } - - @Test - public void testStreamHeaderRead() throws IOException - { - if (EXECUTE_WRITES) - testStreamHeaderWrite(); - - DataInputStream in = getInput("streaming.StreamHeader.bin"); - assert StreamHeader.serializer.deserialize(in, getVersion()) != null; - assert StreamHeader.serializer.deserialize(in, getVersion()) != null; - assert StreamHeader.serializer.deserialize(in, getVersion()) != null; - assert StreamHeader.serializer.deserialize(in, getVersion()) != null; - assert StreamHeader.serializer.deserialize(in, getVersion()) != null; - in.close(); - } - - private void testStreamReplyWrite() throws IOException - { - UUID sessionId = UUIDGen.getTimeUUID(); - StreamReply rep = new StreamReply("this is a file", sessionId, StreamReply.Status.FILE_FINISHED); - DataOutputStream out = getOutput("streaming.StreamReply.bin"); - StreamReply.serializer.serialize(rep, out, getVersion()); - rep.createMessage().serialize(out, getVersion()); - out.close(); - - // test serializedSize - testSerializedSize(rep, StreamReply.serializer); - } - - @Test - public void testStreamReplyRead() throws IOException - { - if (EXECUTE_WRITES) - testStreamReplyWrite(); - - DataInputStream in = getInput("streaming.StreamReply.bin"); - assert StreamReply.serializer.deserialize(in, getVersion()) != null; - assert MessageIn.read(in, getVersion(), -1) != null; - in.close(); - } - - private static PendingFile makePendingFile(boolean sst, int numSecs, OperationType op) - { - Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), "Keyspace1", "Standard1", 23, false); - List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>(); - for (int i = 0; i < numSecs; i++) - sections.add(Pair.create(new Long(i), new Long(i * i))); - return new PendingFile(sst ? makeSSTable() : null, desc, SSTable.COMPONENT_DATA, sections, op); - } - - private void testStreamRequestMessageWrite() throws IOException - { - UUID sessionId = UUIDGen.getTimeUUID(); - Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(); - for (int i = 0; i < 5; i++) - ranges.add(new Range<Token>(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5))))); - List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1")); - StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, sessionId, OperationType.RESTORE_REPLICA_COUNT); - StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), sessionId); - StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), sessionId); - - DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin"); - StreamRequest.serializer.serialize(msg0, out, getVersion()); - StreamRequest.serializer.serialize(msg1, out, getVersion()); - StreamRequest.serializer.serialize(msg2, out, getVersion()); - msg0.createMessage().serialize(out, getVersion()); - msg1.createMessage().serialize(out, getVersion()); - msg2.createMessage().serialize(out, getVersion()); - out.close(); - - // test serializedSize - testSerializedSize(msg0, StreamRequest.serializer); - testSerializedSize(msg1, StreamRequest.serializer); - testSerializedSize(msg2, StreamRequest.serializer); - } - - @Test - public void testStreamRequestMessageRead() throws IOException - { - if (EXECUTE_WRITES) - testStreamRequestMessageWrite(); - - DataInputStream in = getInput("streaming.StreamRequestMessage.bin"); - assert StreamRequest.serializer.deserialize(in, getVersion()) != null; - assert StreamRequest.serializer.deserialize(in, getVersion()) != null; - assert StreamRequest.serializer.deserialize(in, getVersion()) != null; - assert MessageIn.read(in, getVersion(), -1) != null; - assert MessageIn.read(in, getVersion(), -1) != null; - assert MessageIn.read(in, getVersion(), -1) != null; - in.close(); - } - - private static SSTableReader makeSSTable() - { - Table t = Table.open("Keyspace1"); - for (int i = 0; i < 100; i++) - { - RowMutation rm = new RowMutation(t.getName(), ByteBufferUtil.bytes(Long.toString(System.nanoTime()))); - rm.add("Standard1", ByteBufferUtil.bytes("cola"), ByteBufferUtil.bytes("value"), 0); - rm.apply(); - } - try - { - t.getColumnFamilyStore("Standard1").forceBlockingFlush(); - return t.getColumnFamilyStore("Standard1").getSSTables().iterator().next(); - } - catch (Exception any) - { - throw new RuntimeException(any); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java new file mode 100644 index 0000000..60fbf40 --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.cassandra.utils.FBUtilities; + +public class SessionInfoTest +{ + /** + * Test if total numbers are collect + */ + @Test + public void testTotals() + { + UUID cfId = UUID.randomUUID(); + InetAddress local = FBUtilities.getLocalAddress(); + + Collection<StreamSummary> summaries = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + StreamSummary summary = new StreamSummary(cfId, i, (i + 1) * 10); + summaries.add(summary); + } + + StreamSummary sending = new StreamSummary(cfId, 10, 100); + SessionInfo info = new SessionInfo(local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING); + + assert info.getTotalFilesToReceive() == 45; + assert info.getTotalFilesToSend() == 10; + assert info.getTotalSizeToReceive() == 550; + assert info.getTotalSizeToSend() == 100; + // still, no files received or sent + assert info.getTotalFilesReceived() == 0; + assert info.getTotalFilesSent() == 0; + + // receive in progress + info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 50, 100)); + // still in progress, but not completed yet + assert info.getTotalSizeReceived() == 50; + assert info.getTotalSizeSent() == 0; + assert info.getTotalFilesReceived() == 0; + assert info.getTotalFilesSent() == 0; + info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 100, 100)); + // 1 file should be completed + assert info.getTotalSizeReceived() == 100; + assert info.getTotalSizeSent() == 0; + assert info.getTotalFilesReceived() == 1; + assert info.getTotalFilesSent() == 0; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/StreamUtil.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamUtil.java b/test/unit/org/apache/cassandra/streaming/StreamUtil.java deleted file mode 100644 index 4987923..0000000 --- a/test/unit/org/apache/cassandra/streaming/StreamUtil.java +++ /dev/null @@ -1,46 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.cassandra.streaming; - -import java.io.IOError; -import java.io.IOException; -import java.net.InetAddress; - -import org.apache.cassandra.net.MessageIn; - -public class StreamUtil -{ - /** - * Takes an stream request message and creates an empty status response. Exists here because StreamRequestMessage - * is package protected. - */ - static public void finishStreamRequest(MessageIn<StreamRequest> msg, InetAddress to) - { - StreamInSession session = StreamInSession.get(to, msg.payload.sessionId); - try - { - session.closeIfFinished(); - } - catch (IOException e) - { - throw new IOError(e); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index a86330c..3e9eeb1 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -1,35 +1,40 @@ -package org.apache.cassandra.streaming; - /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -import static junit.framework.Assert.assertEquals; -import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.Util; -import static org.apache.cassandra.Util.column; + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; import java.net.InetAddress; -import java.sql.Date; import java.nio.ByteBuffer; +import java.sql.Date; import java.util.*; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; @@ -39,21 +44,18 @@ import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableUtils; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableUtils; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.utils.ByteBufferUtil; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.fail; +import static org.apache.cassandra.Util.column; @RunWith(OrderedJUnit4ClassRunner.class) public class StreamingTransferTest extends SchemaLoader @@ -69,10 +71,64 @@ public class StreamingTransferTest extends SchemaLoader } /** + * Test if empty {@link StreamPlan} returns success with empty result. + */ + @Test + public void testEmptyStreamPlan() throws Exception + { + StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest").execute(); + final UUID planId = futureResult.planId; + Futures.addCallback(futureResult, new FutureCallback<StreamState>() + { + public void onSuccess(StreamState result) + { + assert planId.equals(result.planId); + assert result.description.equals("StreamingTransferTest"); + assert result.sessions.isEmpty(); + } + + public void onFailure(Throwable t) + { + fail(); + } + }); + // should be complete immediately + futureResult.get(100, TimeUnit.MILLISECONDS); + } + + @Test + public void testRequestEmpty() throws Exception + { + // requesting empty data should succeed + IPartitioner p = StorageService.getPartitioner(); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); + + StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest") + .requestRanges(LOCAL, "Keyspace2", ranges) + .execute(); + + UUID planId = futureResult.planId; + StreamState result = futureResult.get(); + assert planId.equals(result.planId); + assert result.description.equals("StreamingTransferTest"); + + // we should have completed session with empty transfer + assert result.sessions.size() == 1; + SessionInfo session = Iterables.get(result.sessions, 0); + assert session.peer.equals(LOCAL); + assert session.getTotalFilesReceived() == 0; + assert session.getTotalFilesSent() == 0; + assert session.getTotalSizeReceived() == 0; + assert session.getTotalSizeSent() == 0; + } + + /** * Create and transfer a single sstable, and return the keys that should have been transferred. * The Mutator must create the given column, but it may also create any other columns it pleases. */ - private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception + private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator mutator) throws Exception { // write a temporary SSTable, and unregister it logger.debug("Mutating " + cfs.name); @@ -87,7 +143,7 @@ public class StreamingTransferTest extends SchemaLoader // transfer the first and last key logger.debug("Transferring " + cfs.name); - transfer(table, sstable); + transfer(sstable); // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); @@ -108,7 +164,7 @@ public class StreamingTransferTest extends SchemaLoader // and that the max timestamp for the file was rediscovered assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp()); - List<String> keys = new ArrayList<String>(); + List<String> keys = new ArrayList<>(); for (int off : offs) keys.add("key" + off); @@ -116,20 +172,18 @@ public class StreamingTransferTest extends SchemaLoader return keys; } - private void transfer(Table table, SSTableReader sstable) throws Exception + private void transfer(SSTableReader sstable) throws Exception { IPartitioner p = StorageService.getPartitioner(); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); - transfer(table, sstable, ranges); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); + transfer(sstable, ranges); } - private void transfer(Table table, SSTableReader sstable, List<Range<Token>> ranges) throws Exception + private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception { - StreamOutSession session = StreamOutSession.create(table.getName(), LOCAL, (IStreamCallback)null); - StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP); - session.await(); + new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, ranges, Collections.singleton(sstable)).execute().get(); } /** @@ -157,7 +211,7 @@ public class StreamingTransferTest extends SchemaLoader SSTableReader sstable = cfs.getSSTables().iterator().next(); cfs.clearUnsafe(); - transfer(table, sstable); + transfer(sstable); // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); @@ -172,7 +226,7 @@ public class StreamingTransferTest extends SchemaLoader final Table table = Table.open("Keyspace1"); final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1"); - List<String> keys = createAndTransfer(table, cfs, new Mutator() + List<String> keys = createAndTransfer(cfs, new Mutator() { public void mutate(String key, String col, long timestamp) throws Exception { @@ -190,7 +244,6 @@ public class StreamingTransferTest extends SchemaLoader for (String key : keys) { long val = key.hashCode(); - IPartitioner p = StorageService.getPartitioner(); IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(val)); @@ -210,14 +263,14 @@ public class StreamingTransferTest extends SchemaLoader final ColumnFamilyStore cfs = table.getColumnFamilyStore("Counter1"); final CounterContext cc = new CounterContext(); - final Map<String, ColumnFamily> cleanedEntries = new HashMap<String, ColumnFamily>(); + final Map<String, ColumnFamily> cleanedEntries = new HashMap<>(); - List<String> keys = createAndTransfer(table, cfs, new Mutator() + List<String> keys = createAndTransfer(cfs, new Mutator() { /** Creates a new SSTable per key: all will be merged before streaming. */ public void mutate(String key, String col, long timestamp) throws Exception { - Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>(); + Map<String, ColumnFamily> entries = new HashMap<>(); ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfs.metadata); ColumnFamily cfCleaned = TreeMapBackedSortedColumns.factory.create(cfs.metadata); CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 1); @@ -254,7 +307,7 @@ public class StreamingTransferTest extends SchemaLoader // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481) cfs.clearUnsafe(); - transfer(table, streamed); + transfer(streamed); SSTableReader restreamed = cfs.getSSTables().iterator().next(); SSTableUtils.assertContentEquals(streamed, restreamed); } @@ -263,7 +316,7 @@ public class StreamingTransferTest extends SchemaLoader public void testTransferTableMultiple() throws Exception { // write temporary SSTables, but don't register them - Set<String> content = new HashSet<String>(); + Set<String> content = new HashSet<>(); content.add("test"); content.add("test2"); content.add("test3"); @@ -271,7 +324,7 @@ public class StreamingTransferTest extends SchemaLoader String tablename = sstable.getTableName(); String cfname = sstable.getColumnFamilyName(); - content = new HashSet<String>(); + content = new HashSet<>(); content.add("transfer1"); content.add("transfer2"); content.add("transfer3"); @@ -279,15 +332,13 @@ public class StreamingTransferTest extends SchemaLoader // transfer the first and last key IPartitioner p = StorageService.getPartitioner(); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test")))); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken())); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken())); // Acquiring references, transferSSTables needs it sstable.acquireReference(); sstable2.acquireReference(); - StreamOutSession session = StreamOutSession.create(tablename, LOCAL, (IStreamCallback) null); - StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP); - session.await(); + new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, ranges, Arrays.asList(sstable, sstable2)).execute().get(); // confirm that the sstables were transferred and registered and that 2 keys arrived ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname); @@ -311,12 +362,12 @@ public class StreamingTransferTest extends SchemaLoader String keyspace = "KeyCacheSpace"; IPartitioner p = StorageService.getPartitioner(); String[] columnFamilies = new String[] { "Standard1", "Standard2", "Standard3" }; - List<SSTableReader> ssTableReaders = new ArrayList<SSTableReader>(); + List<SSTableReader> ssTableReaders = new ArrayList<>(); - NavigableMap<DecoratedKey,String> keys = new TreeMap<DecoratedKey,String>(); + NavigableMap<DecoratedKey,String> keys = new TreeMap<>(); for (String cf : columnFamilies) { - Set<String> content = new HashSet<String>(); + Set<String> content = new HashSet<>(); content.add("data-" + cf + "-1"); content.add("data-" + cf + "-2"); content.add("data-" + cf + "-3"); @@ -332,19 +383,16 @@ public class StreamingTransferTest extends SchemaLoader Map.Entry<DecoratedKey,String> first = keys.firstEntry(); Map.Entry<DecoratedKey,String> last = keys.lastEntry(); Map.Entry<DecoratedKey,String> secondtolast = keys.lowerEntry(last.getKey()); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(p.getMinimumToken(), first.getKey().token)); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getMinimumToken(), first.getKey().token)); // the left hand side of the range is exclusive, so we transfer from the second-to-last token - ranges.add(new Range<Token>(secondtolast.getKey().token, p.getMinimumToken())); + ranges.add(new Range<>(secondtolast.getKey().token, p.getMinimumToken())); // Acquiring references, transferSSTables needs it if (!SSTableReader.acquireReferences(ssTableReaders)) throw new AssertionError(); - StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, (IStreamCallback)null); - StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP); - - session.await(); + new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, ranges, ssTableReaders).execute().get(); // check that only two keys were transferred for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last)) @@ -382,11 +430,11 @@ public class StreamingTransferTest extends SchemaLoader cfs.clearUnsafe(); IPartitioner p = StorageService.getPartitioner(); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key1000")))); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key5")), p.getToken(ByteBufferUtil.bytes("key500")))); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key9")), p.getToken(ByteBufferUtil.bytes("key900")))); - transfer(table, sstable, ranges); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key1000")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key5")), p.getToken(ByteBufferUtil.bytes("key500")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key9")), p.getToken(ByteBufferUtil.bytes("key900")))); + transfer(sstable, ranges); assertEquals(1, cfs.getSSTables().size()); assertEquals(7, Util.getRangeSlice(cfs).size()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 95297b1..ab311e6 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -26,7 +26,10 @@ import java.util.*; import org.junit.Test; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.io.compress.*; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMetadata;