Repository: cassandra Updated Branches: refs/heads/trunk 4de7a65ed -> 59b5b6bef
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java index 3ef7bbb..3884f5a 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java @@ -23,7 +23,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.net.InetAddress; import java.util.*; import com.google.common.collect.HashMultimap; @@ -35,6 +34,7 @@ import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; @@ -192,12 +192,12 @@ public class StorageServiceServerTest metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> configOptions = new HashMap<>(); configOptions.put("DC1", "1"); @@ -209,22 +209,22 @@ public class StorageServiceServerTest Schema.instance.load(meta); Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, - InetAddress.getByName("127.0.0.1")); + InetAddressAndPort.getByName("127.0.0.1")); assertEquals(2, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assertEquals(2, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.4")); assertEquals(2, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.5")); assertEquals(2, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")))); @@ -236,11 +236,11 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> configOptions = new HashMap<>(); configOptions.put("DC1", "1"); @@ -251,19 +251,19 @@ public class StorageServiceServerTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, configOptions)); Schema.instance.load(meta); - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.4")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.5")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))); } @@ -274,11 +274,11 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> configOptions = new HashMap<>(); configOptions.put("DC2", "2"); @@ -289,19 +289,19 @@ public class StorageServiceServerTest Schema.instance.load(meta); // endpoints in DC1 should not have primary range - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assert primaryRanges.isEmpty(); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assert primaryRanges.isEmpty(); // endpoints in DC2 should have primary ranges which also cover DC1 - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.4")); assert primaryRanges.size() == 2; assert primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))); assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.5")); assert primaryRanges.size() == 2; assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))); assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))); @@ -313,11 +313,11 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> configOptions = new HashMap<>(); configOptions.put("DC2", "2"); @@ -328,20 +328,20 @@ public class StorageServiceServerTest Schema.instance.load(meta); // endpoints in DC1 should not have primary range - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assertTrue(primaryRanges.isEmpty()); primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, - InetAddress.getByName("127.0.0.2")); + InetAddressAndPort.getByName("127.0.0.2")); assertTrue(primaryRanges.isEmpty()); // endpoints in DC2 should have primary ranges which also cover DC1 - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.4")); assertTrue(primaryRanges.size() == 2); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.5")); assertTrue(primaryRanges.size() == 2); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")))); @@ -353,22 +353,22 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); // DC1 - Multimap<InetAddress, Token> dc1 = HashMultimap.create(); - dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A")); - dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E")); - dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H")); - dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C")); - dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I")); - dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J")); + Multimap<InetAddressAndPort, Token> dc1 = HashMultimap.create(); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("A")); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("E")); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("H")); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("C")); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("I")); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("J")); metadata.updateNormalTokens(dc1); // DC2 - Multimap<InetAddress, Token> dc2 = HashMultimap.create(); - dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B")); - dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G")); - dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L")); - dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D")); - dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F")); - dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K")); + Multimap<InetAddressAndPort, Token> dc2 = HashMultimap.create(); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("B")); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("G")); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("L")); + dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("D")); + dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("F")); + dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("K")); metadata.updateNormalTokens(dc2); Map<String, String> configOptions = new HashMap<>(); @@ -380,14 +380,14 @@ public class StorageServiceServerTest Schema.instance.load(meta); // endpoints in DC1 should not have primary range - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assert primaryRanges.isEmpty(); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assert primaryRanges.isEmpty(); // endpoints in DC2 should have primary ranges which also cover DC1 - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.4")); assert primaryRanges.size() == 4; assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))); assert primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))); @@ -396,7 +396,7 @@ public class StorageServiceServerTest // the node covers range (L, A] assert primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.5")); assert primaryRanges.size() == 8; assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))); assert primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))); @@ -418,23 +418,23 @@ public class StorageServiceServerTest metadata.clearUnsafe(); // DC1 - Multimap<InetAddress, Token> dc1 = HashMultimap.create(); - dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A")); - dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E")); - dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H")); - dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C")); - dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I")); - dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J")); + Multimap<InetAddressAndPort, Token> dc1 = HashMultimap.create(); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("A")); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("E")); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("H")); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("C")); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("I")); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("J")); metadata.updateNormalTokens(dc1); // DC2 - Multimap<InetAddress, Token> dc2 = HashMultimap.create(); - dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B")); - dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G")); - dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L")); - dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D")); - dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F")); - dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K")); + Multimap<InetAddressAndPort, Token> dc2 = HashMultimap.create(); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("B")); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("G")); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("L")); + dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("D")); + dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("F")); + dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("K")); metadata.updateNormalTokens(dc2); Map<String, String> configOptions = new HashMap<>(); @@ -447,7 +447,7 @@ public class StorageServiceServerTest Schema.instance.load(meta); // endpoints in DC1 should have primary ranges which also cover DC2 - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assertEquals(8, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L")))); @@ -459,7 +459,7 @@ public class StorageServiceServerTest assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H")))); // endpoints in DC1 should have primary ranges which also cover DC2 - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assertEquals(4, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")))); @@ -467,7 +467,7 @@ public class StorageServiceServerTest assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J")))); // endpoints in DC2 should have primary ranges which also cover DC1 - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.4")); assertEquals(4, primaryRanges.size()); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G")))); @@ -476,7 +476,7 @@ public class StorageServiceServerTest // the node covers range (L, A] assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A")))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.5")); assertTrue(primaryRanges.size() == 8); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")))); assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F")))); @@ -497,23 +497,23 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.3")); Keyspace.clear("Keyspace1"); KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.simpleTransient(2)); Schema.instance.load(meta); - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))); - primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.3")); + primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.3")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))); } @@ -525,9 +525,9 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.3")); Map<String, String> configOptions = new HashMap<>(); configOptions.put("replication_factor", "2"); @@ -536,15 +536,15 @@ public class StorageServiceServerTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.simpleTransient(2)); Schema.instance.load(meta); - Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1")); + Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.1")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A"))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.2")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))); - primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.3")); + primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.3")); assert primaryRanges.size() == 1; assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))); } @@ -557,10 +557,10 @@ public class StorageServiceServerTest TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); - metadata.updateNormalToken(new LongToken(1000L), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new LongToken(2000L), InetAddress.getByName("127.0.0.2")); - metadata.updateNormalToken(new LongToken(3000L), InetAddress.getByName("127.0.0.3")); - metadata.updateNormalToken(new LongToken(4000L), InetAddress.getByName("127.0.0.4")); + metadata.updateNormalToken(new LongToken(1000L), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new LongToken(2000L), InetAddressAndPort.getByName("127.0.0.2")); + metadata.updateNormalToken(new LongToken(3000L), InetAddressAndPort.getByName("127.0.0.3")); + metadata.updateNormalToken(new LongToken(4000L), InetAddressAndPort.getByName("127.0.0.4")); Collection<Range<Token>> repairRangeFrom = StorageService.instance.createRepairRangeFrom("1500", "3700"); assert repairRangeFrom.size() == 3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java index 8172463..f8567e8 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.schema.KeyspaceParams; @@ -48,7 +49,7 @@ public class WriteResponseHandlerTest { static Keyspace ks; static ColumnFamilyStore cfs; - static List<InetAddress> targets; + static List<InetAddressAndPort> targets; @BeforeClass public static void setUpClass() throws Throwable @@ -57,36 +58,36 @@ public class WriteResponseHandlerTest // Register peers with expected DC for NetworkTopologyStrategy. TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); - metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.1.0.255")); - metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.2.0.255")); + metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.255")); + metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.2.0.255")); DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch() { - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return null; } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { - byte[] address = endpoint.getAddress(); + byte[] address = endpoint.address.getAddress(); if (address[1] == 1) return "datacenter1"; else return "datacenter2"; } - public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress) + public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress) { return null; } - public void sortByProximity(InetAddress address, List<InetAddress> addresses) + public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses) { } - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } @@ -96,7 +97,7 @@ public class WriteResponseHandlerTest } - public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) + public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) { return false; } @@ -105,8 +106,8 @@ public class WriteResponseHandlerTest SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar")); ks = Keyspace.open("Foo"); cfs = ks.getColumnFamilyStore("Bar"); - targets = ImmutableList.of(InetAddress.getByName("127.1.0.255"), InetAddress.getByName("127.1.0.254"), InetAddress.getByName("127.1.0.253"), - InetAddress.getByName("127.2.0.255"), InetAddress.getByName("127.2.0.254"), InetAddress.getByName("127.2.0.253")); + targets = ImmutableList.of(InetAddressAndPort.getByName("127.1.0.255"), InetAddressAndPort.getByName("127.1.0.254"), InetAddressAndPort.getByName("127.1.0.253"), + InetAddressAndPort.getByName("127.2.0.255"), InetAddressAndPort.getByName("127.2.0.254"), InetAddressAndPort.getByName("127.2.0.253")); } @Before http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java index bc24979..4f0c494 100644 --- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java +++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import org.junit.Test; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.FBUtilities; @@ -36,7 +36,7 @@ public class SessionInfoTest public void testTotals() { TableId tableId = TableId.generate(); - InetAddress local = FBUtilities.getLocalAddress(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); Collection<StreamSummary> summaries = new ArrayList<>(); for (int i = 0; i < 10; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 5c29698..ceaaae0 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -41,6 +40,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; @@ -74,7 +74,7 @@ public class StreamTransferTaskTest @Test public void testScheduleTimeout() throws Exception { - InetAddress peer = FBUtilities.getBroadcastAddress(); + InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); StreamSession session = new StreamSession(peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), PreviewKind.ALL); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); @@ -120,7 +120,7 @@ public class StreamTransferTaskTest @Test public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { - InetAddress peer = FBUtilities.getBroadcastAddress(); + InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 7a51d0c..16c07a0 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; @@ -35,6 +34,7 @@ import junit.framework.Assert; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; @@ -69,7 +69,7 @@ public class StreamingTransferTest DatabaseDescriptor.daemonInitialization(); } - public static final InetAddress LOCAL = FBUtilities.getBroadcastAddress(); + public static final InetAddressAndPort LOCAL = FBUtilities.getBroadcastAddressAndPort(); public static final String KEYSPACE1 = "StreamingTransferTest1"; public static final String CF_STANDARD = "Standard1"; public static final String CF_COUNTER = "Counter1"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java index a9849a3..617bae1 100644 --- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,6 +35,7 @@ import org.junit.Test; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.async.TestScheduledFuture; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamOperation; @@ -43,7 +45,7 @@ import org.apache.cassandra.streaming.messages.CompleteMessage; public class NettyStreamingMessageSenderTest { - private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 0); + private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0); private EmbeddedChannel channel; private StreamSession session; @@ -62,8 +64,8 @@ public class NettyStreamingMessageSenderTest channel = new EmbeddedChannel(); channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); UUID pendingRepair = UUID.randomUUID(); - session = new StreamSession(REMOTE_ADDR.getAddress(), REMOTE_ADDR.getAddress(), (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL); - StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, pendingRepair, session.getPreviewKind()); + session = new StreamSession(REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL); + StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, true, pendingRepair, session.getPreviewKind()); session.init(future); sender = session.getMessageSender(); sender.setControlMessageChannel(channel); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java index a674e6b..b4a736e 100644 --- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.UUID; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -35,6 +36,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.PreviewKind; @@ -52,7 +54,7 @@ import org.apache.cassandra.streaming.messages.StreamMessage; public class StreamingInboundHandlerTest { private static final int VERSION = StreamMessage.CURRENT_VERSION; - private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 0); + private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0); private StreamingInboundHandler handler; private EmbeddedChannel channel; @@ -123,7 +125,7 @@ public class StreamingInboundHandlerTest @Test public void StreamDeserializingTask_deriveSession_StreamInitMessage() throws InterruptedException, IOException { - StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR.getAddress(), 0, UUID.randomUUID(), StreamOperation.REPAIR, true, UUID.randomUUID(), PreviewKind.ALL); + StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR, 0, UUID.randomUUID(), StreamOperation.REPAIR, true, UUID.randomUUID(), PreviewKind.ALL); StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel); StreamSession session = task.deriveSession(msg); Assert.assertNotNull(session); @@ -145,7 +147,7 @@ public class StreamingInboundHandlerTest @Test (expected = IllegalStateException.class) public void StreamDeserializingTask_deriveSession_IFM_NoSession() throws InterruptedException, IOException { - FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR.getAddress(), UUID.randomUUID(), 0, 0, + FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR, UUID.randomUUID(), 0, 0, BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, UUID.randomUUID(), 0 , null); IncomingFileMessage msg = new IncomingFileMessage(null, header); StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel); @@ -156,9 +158,9 @@ public class StreamingInboundHandlerTest public void StreamDeserializingTask_deriveSession_IFM_HasSession() throws InterruptedException, IOException { UUID planId = UUID.randomUUID(); - StreamResultFuture future = StreamResultFuture.initReceivingSide(0, planId, StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, UUID.randomUUID(), PreviewKind.ALL); + StreamResultFuture future = StreamResultFuture.initReceivingSide(0, planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, true, UUID.randomUUID(), PreviewKind.ALL); StreamManager.instance.register(future); - FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR.getAddress(), planId, 0, 0, + FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0, 0, BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, UUID.randomUUID(), 0 , null); IncomingFileMessage msg = new IncomingFileMessage(null, header); StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/tracing/TracingTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tracing/TracingTest.java b/test/unit/org/apache/cassandra/tracing/TracingTest.java index f546496..61e08b0 100644 --- a/test/unit/org/apache/cassandra/tracing/TracingTest.java +++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java @@ -31,6 +31,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.commons.lang3.StringUtils; @@ -197,7 +198,7 @@ public final class TracingTest return super.newSession(sessionId, traceType, customPayload); } - protected TraceState newTraceState(InetAddress ia, UUID uuid, Tracing.TraceType tt) + protected TraceState newTraceState(InetAddressAndPort ia, UUID uuid, Tracing.TraceType tt) { return new TraceState(ia, uuid, tt) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java index 2dcd2ac..3d47ff3 100644 --- a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java +++ b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.transport; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; @@ -33,26 +32,27 @@ import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.transport.messages.ErrorMessage; import static org.junit.Assert.assertEquals; public class ErrorMessageTest { - private static Map<InetAddress, RequestFailureReason> failureReasonMap1; - private static Map<InetAddress, RequestFailureReason> failureReasonMap2; + private static Map<InetAddressAndPort, RequestFailureReason> failureReasonMap1; + private static Map<InetAddressAndPort, RequestFailureReason> failureReasonMap2; @BeforeClass public static void setUpFixtures() throws UnknownHostException { failureReasonMap1 = new HashMap<>(); - failureReasonMap1.put(InetAddress.getByName("127.0.0.1"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); - failureReasonMap1.put(InetAddress.getByName("127.0.0.2"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); - failureReasonMap1.put(InetAddress.getByName("127.0.0.3"), RequestFailureReason.UNKNOWN); + failureReasonMap1.put(InetAddressAndPort.getByName("127.0.0.1"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + failureReasonMap1.put(InetAddressAndPort.getByName("127.0.0.2"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + failureReasonMap1.put(InetAddressAndPort.getByName("127.0.0.3"), RequestFailureReason.UNKNOWN); failureReasonMap2 = new HashMap<>(); - failureReasonMap2.put(InetAddress.getByName("127.0.0.1"), RequestFailureReason.UNKNOWN); - failureReasonMap2.put(InetAddress.getByName("127.0.0.2"), RequestFailureReason.UNKNOWN); + failureReasonMap2.put(InetAddressAndPort.getByName("127.0.0.1"), RequestFailureReason.UNKNOWN); + failureReasonMap2.put(InetAddressAndPort.getByName("127.0.0.2"), RequestFailureReason.UNKNOWN); } @Test @@ -102,11 +102,11 @@ public class ErrorMessageTest @Test public void testRequestFailureExceptionMakesCopy() throws UnknownHostException { - Map<InetAddress, RequestFailureReason> modifiableFailureReasons = new HashMap<>(failureReasonMap1); + Map<InetAddressAndPort, RequestFailureReason> modifiableFailureReasons = new HashMap<>(failureReasonMap1); ReadFailureException rfe = new ReadFailureException(ConsistencyLevel.ALL, 3, 3, false, modifiableFailureReasons); WriteFailureException wfe = new WriteFailureException(ConsistencyLevel.ALL, 3, 3, WriteType.SIMPLE, modifiableFailureReasons); - modifiableFailureReasons.put(InetAddress.getByName("127.0.0.4"), RequestFailureReason.UNKNOWN); + modifiableFailureReasons.put(InetAddressAndPort.getByName("127.0.0.4"), RequestFailureReason.UNKNOWN); assertEquals(failureReasonMap1, rfe.failureReasonByEndpoint); assertEquals(failureReasonMap1, wfe.failureReasonByEndpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java index e287c08..6b95c67 100644 --- a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java +++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java @@ -62,7 +62,8 @@ public class ProtocolVersionTest Assert.assertNotNull(ProtocolVersion.CURRENT); Assert.assertFalse(ProtocolVersion.V4.isBeta()); - Assert.assertTrue(ProtocolVersion.V5.isBeta()); + Assert.assertFalse(ProtocolVersion.V5.isBeta()); + Assert.assertTrue(ProtocolVersion.V6.isBeta()); } @Test @@ -72,24 +73,30 @@ public class ProtocolVersionTest Assert.assertTrue(ProtocolVersion.V2.isSmallerOrEqualTo(ProtocolVersion.V2)); Assert.assertTrue(ProtocolVersion.V3.isSmallerOrEqualTo(ProtocolVersion.V3)); Assert.assertTrue(ProtocolVersion.V4.isSmallerOrEqualTo(ProtocolVersion.V4)); + Assert.assertTrue(ProtocolVersion.V5.isSmallerOrEqualTo(ProtocolVersion.V5)); Assert.assertTrue(ProtocolVersion.V1.isGreaterOrEqualTo(ProtocolVersion.V1)); Assert.assertTrue(ProtocolVersion.V2.isGreaterOrEqualTo(ProtocolVersion.V2)); Assert.assertTrue(ProtocolVersion.V3.isGreaterOrEqualTo(ProtocolVersion.V3)); Assert.assertTrue(ProtocolVersion.V4.isGreaterOrEqualTo(ProtocolVersion.V4)); + Assert.assertTrue(ProtocolVersion.V5.isGreaterOrEqualTo(ProtocolVersion.V5)); Assert.assertTrue(ProtocolVersion.V1.isSmallerThan(ProtocolVersion.V2)); Assert.assertTrue(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V3)); Assert.assertTrue(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V4)); + Assert.assertTrue(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V5)); Assert.assertFalse(ProtocolVersion.V1.isGreaterThan(ProtocolVersion.V2)); Assert.assertFalse(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V3)); Assert.assertFalse(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V4)); + Assert.assertFalse(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V5)); + Assert.assertTrue(ProtocolVersion.V5.isGreaterThan(ProtocolVersion.V4)); Assert.assertTrue(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V3)); Assert.assertTrue(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V2)); Assert.assertTrue(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V1)); + Assert.assertFalse(ProtocolVersion.V5.isSmallerThan(ProtocolVersion.V4)); Assert.assertFalse(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V3)); Assert.assertFalse(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V2)); Assert.assertFalse(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V1)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/transport/SerDeserTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java index 2592ae7..d3b9282 100644 --- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java +++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java @@ -42,12 +42,14 @@ import static org.junit.Assert.assertEquals; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; /** * Serialization/deserialization tests for protocol objects and messages. */ public class SerDeserTest { + @BeforeClass public static void setupDD() { @@ -113,12 +115,12 @@ public class SerDeserTest { List<Event> events = new ArrayList<>(); - events.add(TopologyChange.newNode(FBUtilities.getBroadcastAddress(), 42)); - events.add(TopologyChange.removedNode(FBUtilities.getBroadcastAddress(), 42)); - events.add(TopologyChange.movedNode(FBUtilities.getBroadcastAddress(), 42)); + events.add(TopologyChange.newNode(FBUtilities.getBroadcastAddressAndPort())); + events.add(TopologyChange.removedNode(FBUtilities.getBroadcastAddressAndPort())); + events.add(TopologyChange.movedNode(FBUtilities.getBroadcastAddressAndPort())); - events.add(StatusChange.nodeUp(FBUtilities.getBroadcastAddress(), 42)); - events.add(StatusChange.nodeDown(FBUtilities.getBroadcastAddress(), 42)); + events.add(StatusChange.nodeUp(FBUtilities.getBroadcastAddressAndPort())); + events.add(StatusChange.nodeDown(FBUtilities.getBroadcastAddressAndPort())); events.add(new SchemaChange(SchemaChange.Change.CREATED, "ks")); events.add(new SchemaChange(SchemaChange.Change.UPDATED, "ks")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java index b8d2633..a982624 100644 --- a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java +++ b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java @@ -153,7 +153,7 @@ public class FBUtilitiesTest } @Test - public void testGetBroadcastRpcAddress() throws Exception + public void testGetBroadcastNativeAddress() throws Exception { //When both rpc_address and broadcast_rpc_address are null, it should return the local address (from DD.applyAddressConfig) FBUtilities.reset(); @@ -161,21 +161,21 @@ public class FBUtilitiesTest testConfig.rpc_address = null; testConfig.broadcast_rpc_address = null; DatabaseDescriptor.applyAddressConfig(testConfig); - assertEquals(FBUtilities.getLocalAddress(), FBUtilities.getBroadcastRpcAddress()); + assertEquals(FBUtilities.getJustLocalAddress(), FBUtilities.getJustBroadcastNativeAddress()); //When rpc_address is defined and broadcast_rpc_address is null, it should return the rpc_address FBUtilities.reset(); testConfig.rpc_address = "127.0.0.2"; testConfig.broadcast_rpc_address = null; DatabaseDescriptor.applyAddressConfig(testConfig); - assertEquals(InetAddress.getByName("127.0.0.2"), FBUtilities.getBroadcastRpcAddress()); + assertEquals(InetAddress.getByName("127.0.0.2"), FBUtilities.getJustBroadcastNativeAddress()); //When both rpc_address and broadcast_rpc_address are defined, it should return broadcast_rpc_address FBUtilities.reset(); testConfig.rpc_address = "127.0.0.2"; testConfig.broadcast_rpc_address = "127.0.0.3"; DatabaseDescriptor.applyAddressConfig(testConfig); - assertEquals(InetAddress.getByName("127.0.0.3"), FBUtilities.getBroadcastRpcAddress()); + assertEquals(InetAddress.getByName("127.0.0.3"), FBUtilities.getJustBroadcastNativeAddress()); FBUtilities.reset(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java index 867918a..ffc1ace 100644 --- a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java +++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java @@ -20,7 +20,6 @@ package org.apache.cassandra.stress; import java.io.File; import java.io.IOError; -import java.net.InetAddress; import java.net.URI; import java.util.*; import java.util.concurrent.*; @@ -44,6 +43,7 @@ import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.stress.generate.PartitionGenerator; @@ -185,7 +185,7 @@ public abstract class CompactionStress implements Runnable tokenMetadata.clearUnsafe(); for (int i = 1; i <= numTokens; i++) { - InetAddress addr = FBUtilities.getBroadcastAddress(); + InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); List<Token> tokens = Lists.newArrayListWithCapacity(numTokens); for (int j = 0; j < numTokens; ++j) tokens.add(p.getRandomToken(random)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java index a029162..24c10bf 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java @@ -29,6 +29,8 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.*; +import com.google.common.net.HostAndPort; + import com.datastax.driver.core.Host; import org.apache.cassandra.stress.util.ResultLogger; @@ -37,6 +39,7 @@ public class SettingsNode implements Serializable public final List<String> nodes; public final boolean isWhiteList; public final String datacenter; + public final boolean allowServerPortDiscovery; public SettingsNode(Options options) { @@ -69,6 +72,7 @@ public class SettingsNode implements Serializable isWhiteList = options.whitelist.setByUser(); datacenter = options.datacenter.value(); + allowServerPortDiscovery = options.allowServerPortDiscovery.setByUser(); } public Set<String> resolveAllPermitted(StressSettings settings) @@ -80,7 +84,7 @@ public class SettingsNode implements Serializable if (!isWhiteList) { for (Host host : settings.getJavaDriverClient().getCluster().getMetadata().getAllHosts()) - r.add(host.getAddress().getHostName()); + r.add(host.getSocketAddress().getHostString() + ":" + host.getSocketAddress().getPort()); break; } case SIMPLE_NATIVE: @@ -97,7 +101,8 @@ public class SettingsNode implements Serializable { try { - r.add(InetAddress.getByName(node)); + HostAndPort hap = HostAndPort.fromString(node); + r.add(InetAddress.getByName(hap.getHost())); } catch (UnknownHostException e) { @@ -114,7 +119,8 @@ public class SettingsNode implements Serializable { try { - r.add(new InetSocketAddress(InetAddress.getByName(node), port)); + HostAndPort hap = HostAndPort.fromString(node).withDefaultPort(port); + r.add(new InetSocketAddress(InetAddress.getByName(hap.getHost()), hap.getPort())); } catch (UnknownHostException e) { @@ -139,12 +145,13 @@ public class SettingsNode implements Serializable final OptionSimple datacenter = new OptionSimple("datacenter=", ".*", null, "Datacenter used for DCAwareRoundRobinLoadPolicy", false); final OptionSimple whitelist = new OptionSimple("whitelist", "", null, "Limit communications to the provided nodes", false); final OptionSimple file = new OptionSimple("file=", ".*", null, "Node file (one per line)", false); + final OptionSimple allowServerPortDiscovery = new OptionSimple("allow_server_port_discovery", "", null, "Allow Java client to discover server client port numbers", false); final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of nodes", false); @Override public List<? extends Option> options() { - return Arrays.asList(datacenter, whitelist, file, list); + return Arrays.asList(datacenter, whitelist, file, allowServerPortDiscovery, list); } } @@ -154,6 +161,7 @@ public class SettingsNode implements Serializable out.println(" Nodes: " + nodes); out.println(" Is White List: " + isWhiteList); out.println(" Datacenter: " + datacenter); + out.println(" Allow server port discovery: " + allowServerPortDiscovery); } public static SettingsNode get(Map<String, String[]> clArgs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java index 4928cd2..fbcab4b 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java @@ -54,6 +54,7 @@ public class JavaDriverClient private Cluster cluster; private Session session; private final LoadBalancingPolicy loadBalancingPolicy; + private final boolean allowServerPortDiscovery; private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>(); @@ -73,6 +74,7 @@ public class JavaDriverClient this.encryptionOptions = encryptionOptions; this.loadBalancingPolicy = loadBalancingPolicy(settings); this.connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost; + this.allowServerPortDiscovery = settings.node.allowServerPortDiscovery; int maxThreadCount = 0; if (settings.rate.auto) @@ -134,6 +136,9 @@ public class JavaDriverClient .withoutJMXReporting() .withProtocolVersion(protocolVersion) .withoutMetrics(); // The driver uses metrics 3 with conflict with our version + if (allowServerPortDiscovery) + clusterBuilder = clusterBuilder.allowServerPortDiscovery(); + if (loadBalancingPolicy != null) clusterBuilder.withLoadBalancingPolicy(loadBalancingPolicy); clusterBuilder.withCompression(compression); @@ -166,7 +171,7 @@ public class JavaDriverClient for (Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s%n", - host.getDatacenter(), host.getAddress(), host.getRack()); + host.getDatacenter(), host.getAddress() + ":" + host.getSocketAddress().getPort(), host.getRack()); } session = cluster.connect(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org