This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 0dd7a14 CASSANDRASC-73: Updates token-ranges endpoint to return additional instance metadata 0dd7a14 is described below commit 0dd7a14f7e70dc8a40d20eba5c78dc4f6a580e17 Author: Arjun Ashok <arjun_as...@apple.com> AuthorDate: Mon Sep 18 16:41:07 2023 -0700 CASSANDRASC-73: Updates token-ranges endpoint to return additional instance metadata Patch by Arjun Ashok; Reviewed by Dinesh Joshi, Francisco Guerrero, Yifan Cai for CASSANDRASC-73 --- CHANGES.txt | 1 + .../adapters/base/CassandraStorageOperations.java | 12 +- .../adapters/base/TokenRangeReplicaProvider.java | 52 +++++-- .../base/TokenRangeReplicaProviderTest.java | 155 ++++++++++++++++----- .../sidecar/client/SidecarClientTest.java | 27 +++- .../common/data/TokenRangeReplicasResponse.java | 119 ++++++++++++++-- .../tokenrange/BaseTokenRangeIntegrationTest.java | 20 ++- 7 files changed, 313 insertions(+), 73 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a465ed3..4dff0fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Updates token-ranges endpoint to return additional instance metadata (CASSANDRASC-73) * Shade Jackson completely to prevent incompatibility issues (CASSANDRASC-75) * Adds endpoint to serve read/write replica-sets by token-ranges (CASSANDRASC-60) * Split unit tests and integration tests in CircleCI config (CASSANDRASC-72) diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 68769f2..efe0fa1 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -49,20 +49,16 @@ public class CassandraStorageOperations implements StorageOperations protected final TokenRangeReplicaProvider tokenRangeReplicaProvider; /** - * Creates a new instance with the provided {@link JmxClient} + * Creates a new instance with the provided {@link JmxClient} and {@link DnsResolver} * * @param jmxClient the JMX client used to communicate with the Cassandra instance + * @param dnsResolver the DNS resolver used to lookup replicas */ public CassandraStorageOperations(JmxClient jmxClient, DnsResolver dnsResolver) - { - this(jmxClient, new RingProvider(jmxClient, dnsResolver)); - } - - public CassandraStorageOperations(JmxClient jmxClient, RingProvider ringProvider) { this.jmxClient = jmxClient; - this.ringProvider = ringProvider; - this.tokenRangeReplicaProvider = new TokenRangeReplicaProvider(jmxClient); + this.ringProvider = new RingProvider(jmxClient, dnsResolver); + this.tokenRangeReplicaProvider = new TokenRangeReplicaProvider(jmxClient, dnsResolver); } /** diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java index f8e0d39..24162f1 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.net.HostAndPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,8 @@ import org.apache.cassandra.sidecar.common.JmxClient; import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse.ReplicaInfo; +import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse.ReplicaMetadata; +import org.apache.cassandra.sidecar.common.dns.DnsResolver; import org.apache.cassandra.sidecar.common.utils.GossipInfoParser; import org.jetbrains.annotations.NotNull; @@ -52,17 +55,21 @@ import static org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicas.gene */ public class TokenRangeReplicaProvider { + private interface KeyspaceToRangeMappingFunc extends Function<String, Map<List<String>, List<String>>> { } protected final JmxClient jmxClient; + private final DnsResolver dnsResolver; private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class); - public TokenRangeReplicaProvider(JmxClient jmxClient) + public TokenRangeReplicaProvider(JmxClient jmxClient, DnsResolver dnsResolver) { + this.jmxClient = jmxClient; + this.dnsResolver = dnsResolver; } public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner) @@ -87,11 +94,11 @@ public class TokenRangeReplicaProvider List<ReplicaInfo> writeReplicas = writeReplicasFromPendingRanges(allTokenRangeReplicas, hostToDatacenter); List<ReplicaInfo> readReplicas = readReplicasFromReplicaMapping(naturalTokenRangeReplicas, hostToDatacenter); - Map<String, String> replicaToStateMap = replicaToStateMap(allTokenRangeReplicas, storage); + List<ReplicaMetadata> replicaMetadata = getReplicaMetadata(allTokenRangeReplicas, storage, hostToDatacenter); - return new TokenRangeReplicasResponse(replicaToStateMap, - writeReplicas, - readReplicas); + return new TokenRangeReplicasResponse(writeReplicas, + readReplicas, + replicaMetadata); } private List<TokenRangeReplicas> getTokenRangeReplicas(String rangeType, String keyspace, Partitioner partitioner, @@ -115,22 +122,46 @@ public class TokenRangeReplicaProvider .collect(toList()); } - private Map<String, String> replicaToStateMap(List<TokenRangeReplicas> replicaSet, StorageJmxOperations storage) + private List<ReplicaMetadata> getReplicaMetadata(List<TokenRangeReplicas> replicaSet, + StorageJmxOperations storage, + Map<String, String> hostToDatacenter) { List<String> joiningNodes = storage.getJoiningNodesWithPort(); List<String> leavingNodes = storage.getLeavingNodesWithPort(); List<String> movingNodes = storage.getMovingNodesWithPort(); + List<String> liveNodes = storage.getLiveNodesWithPort(); + List<String> deadNodes = storage.getUnreachableNodesWithPort(); + + String rawGossipInfo = getRawGossipInfo(); GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo); StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo); + RingProvider.Status status = new RingProvider.Status(liveNodes, deadNodes); return replicaSet.stream() .map(TokenRangeReplicas::replicaSet) .flatMap(Collection::stream) .distinct() - .collect(Collectors.toMap(Function.identity(), state::of)); + .map(replica -> { + try + { + HostAndPort hap = HostAndPort.fromString(replica); + return new ReplicaMetadata(state.of(replica), + status.of(replica), + dnsResolver.reverseResolve(hap.getHost()), + hap.getHost(), + hap.getPort(), + hostToDatacenter.get(replica)); + } + catch (UnknownHostException e) + { + throw new RuntimeException( + String.format("Failed to resolve fqdn for replica %s ", replica), e); + } + }) + .collect(Collectors.toList()); } protected EndpointSnitchJmxOperations initializeEndpointProxy() @@ -210,10 +241,9 @@ public class TokenRangeReplicaProvider Map<String, List<String>> dcReplicaMapping = new HashMap<>(); replicas.stream() - .filter(hostToDatacenter::containsKey) - .forEach(item -> - dcReplicaMapping.computeIfAbsent(hostToDatacenter.get(item), v -> new ArrayList<>()) - .add(item)); + .filter(hostToDatacenter::containsKey) + .forEach(item -> dcReplicaMapping.computeIfAbsent(hostToDatacenter.get(item), v -> new ArrayList<>()) + .add(item)); return dcReplicaMapping; } diff --git a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java index ab074f3..dee9935 100644 --- a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java +++ b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.BeforeEach; @@ -31,8 +33,10 @@ import org.junit.jupiter.api.Test; import org.apache.cassandra.sidecar.common.JmxClient; import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse; +import org.apache.cassandra.sidecar.common.dns.DnsResolver; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.mock; @@ -56,20 +60,24 @@ public class TokenRangeReplicaProviderTest "127.0.0.4:7000"); public static final String TEST_KEYSPACE = "test_keyspace"; + private static final String TEST_HOSTNAME = "Hostname"; + StorageJmxOperations storageOperations; EndpointSnitchJmxOperations endpointOperations; ClusterMembershipJmxOperations clusterMembershipOperations; JmxClient jmxClient; + DnsResolver dnsResolver; TokenRangeReplicaProvider instance; @BeforeEach - void setup() + void setup() throws UnknownHostException { storageOperations = mock(StorageJmxOperations.class); endpointOperations = mock(EndpointSnitchJmxOperations.class); clusterMembershipOperations = mock(ClusterMembershipJmxOperations.class); jmxClient = mock(JmxClient.class); - instance = new TokenRangeReplicaProvider(jmxClient); + dnsResolver = mock(DnsResolver.class); + instance = new TokenRangeReplicaProvider(jmxClient, dnsResolver); when(jmxClient.proxy(StorageJmxOperations.class, "org.apache.cassandra.db:type=StorageService")) .thenReturn(storageOperations); @@ -77,6 +85,7 @@ public class TokenRangeReplicaProviderTest .thenReturn(endpointOperations); when(jmxClient.proxy(ClusterMembershipJmxOperations.class, "org.apache.cassandra.net:type=FailureDetector")) .thenReturn(clusterMembershipOperations); + when(dnsResolver.reverseResolve(any())).thenReturn(TEST_HOSTNAME); } @Test @@ -89,6 +98,8 @@ public class TokenRangeReplicaProviderTest Map<List<String>, List<String>> writeReplicaMappings = new HashMap<>(); when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(writeReplicaMappings); when(endpointOperations.getDatacenter(anyString())).thenReturn(TEST_DC1); + when(storageOperations.getLiveNodesWithPort()).thenReturn(TEST_ENDPOINTS1); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); when(clusterMembershipOperations.getAllEndpointStatesWithPort()).thenReturn(""); TokenRangeReplicasResponse result = instance.tokenRangeReplicas(TEST_KEYSPACE, Partitioner.Random); @@ -99,10 +110,24 @@ public class TokenRangeReplicaProviderTest // Single DC assertThat(result.readReplicas().get(0).replicasByDatacenter().size()).isEqualTo(1); assertThat(result.readReplicas().get(0).replicasByDatacenter().get(TEST_DC1)).containsAll(TEST_ENDPOINTS1); - assertThat(result.replicaState().size()).isEqualTo(3); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Normal"); + assertThat(result.replicaMetadata().size()).isEqualTo(3); + TokenRangeReplicasResponse.ReplicaMetadata nodeMetadata = filterReplicaMetadata(result.replicaMetadata(), + "127.0.0.1", 7000); + assertThat(nodeMetadata.state()).isEqualTo("Normal"); + assertThat(nodeMetadata.status()).isEqualTo("Up"); + assertThat(nodeMetadata.datacenter()).isEqualTo(TEST_DC1); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Normal"); + } + + private TokenRangeReplicasResponse.ReplicaMetadata filterReplicaMetadata( + List<TokenRangeReplicasResponse.ReplicaMetadata> replicaMetadata, String address, int port) + { + return replicaMetadata.stream() + .filter(r -> (r.address().equals(address) && r.port() == port)) + .findFirst().get(); } @Test @@ -111,11 +136,16 @@ public class TokenRangeReplicaProviderTest Map<List<String>, List<String>> readReplicaMappings = new HashMap<>(); readReplicaMappings.put(TOKEN_RANGE1, TEST_ENDPOINTS1); readReplicaMappings.put(TOKEN_RANGE2, TEST_ENDPOINTS2); + List<String> allLiveNodes = Stream.concat(TEST_ENDPOINTS1.stream(), TEST_ENDPOINTS2.stream()) + .collect(Collectors.toList()); + when(storageOperations.getLiveNodesWithPort()).thenReturn(allLiveNodes); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(readReplicaMappings); Map<List<String>, List<String>> writeReplicaMappings = new HashMap<>(); when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(writeReplicaMappings); when(storageOperations.getLeavingNodesWithPort()).thenReturn(Arrays.asList("128.0.0.1:7000", "127.0.0.2:7000")); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); when(clusterMembershipOperations.getAllEndpointStatesWithPort()).thenReturn(generateSampleGossip("NORMAL", "LEAVING", "NORMAL", @@ -137,12 +167,25 @@ public class TokenRangeReplicaProviderTest assertThat(result.readReplicas().get(1).replicasByDatacenter().size()).isEqualTo(1); assertThat(result.readReplicas().get(0).replicasByDatacenter().get(TEST_DC1)).containsAll(TEST_ENDPOINTS1); assertThat(result.readReplicas().get(1).replicasByDatacenter().get(TEST_DC2)).containsAll(TEST_ENDPOINTS2); - assertThat(result.replicaState().size()).isEqualTo(5); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("128.0.0.1:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("128.0.0.2:7000")).isEqualTo("Normal"); + assertThat(result.replicaMetadata().size()).isEqualTo(5); + TokenRangeReplicasResponse.ReplicaMetadata nodeMetadataDc1 = filterReplicaMetadata(result.replicaMetadata(), + "127.0.0.1", 7000); + assertThat(nodeMetadataDc1.state()).isEqualTo("Normal"); + assertThat(nodeMetadataDc1.status()).isEqualTo("Up"); + assertThat(nodeMetadataDc1.datacenter()).isEqualTo(TEST_DC1); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Normal"); + + TokenRangeReplicasResponse.ReplicaMetadata nodeMetadatDc2 = filterReplicaMetadata(result.replicaMetadata(), + "128.0.0.1", 7000); + assertThat(nodeMetadatDc2.state()).isEqualTo("Leaving"); + assertThat(nodeMetadatDc2.status()).isEqualTo("Up"); + assertThat(nodeMetadatDc2.datacenter()).isEqualTo(TEST_DC2); + + assertThat(filterReplicaMetadata(result.replicaMetadata(), "128.0.0.2", 7000) + .state()).isEqualTo("Normal"); } @Test @@ -151,6 +194,10 @@ public class TokenRangeReplicaProviderTest Map<List<String>, List<String>> readReplicaMappings = new HashMap<>(); readReplicaMappings.put(TOKEN_RANGE1, TEST_ENDPOINTS1); readReplicaMappings.put(TOKEN_RANGE2, TEST_MULTI_DC_ENDPOINTS); + List<String> allLiveNodes = Stream.concat(TEST_ENDPOINTS1.stream(), TEST_MULTI_DC_ENDPOINTS.stream()) + .collect(Collectors.toList()); + when(storageOperations.getLiveNodesWithPort()).thenReturn(allLiveNodes); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(readReplicaMappings); Map<List<String>, List<String>> writeReplicaMappings = new HashMap<>(); @@ -181,12 +228,19 @@ public class TokenRangeReplicaProviderTest assertThat(replicaInfoWithMultipleDCs.replicasByDatacenter().size()).isEqualTo(2); assertThat(result.readReplicas().get(0).replicasByDatacenter().get(TEST_DC1)).containsAll(TEST_ENDPOINTS1); assertThat(result.readReplicas().get(1).replicasByDatacenter().get(TEST_DC2)).containsAll(TEST_ENDPOINTS2); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.4:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("128.0.0.1:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("128.0.0.2:7000")).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.1", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.4", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "128.0.0.1", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "128.0.0.2", 7000) + .state()).isEqualTo("Normal"); + } @Test @@ -198,6 +252,10 @@ public class TokenRangeReplicaProviderTest Map<List<String>, List<String>> writeReplicaMappings = new HashMap<>(); writeReplicaMappings.put(TOKEN_RANGE2, TEST_ENDPOINTS2); + when(storageOperations.getLiveNodesWithPort()).thenReturn(TEST_ENDPOINTS1); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); + + when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(readReplicaMappings); when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(writeReplicaMappings); when(storageOperations.getLeavingNodesWithPort()).thenReturn(Arrays.asList("127.0.0.3:7000", "128.0.0.1:7000")); @@ -218,11 +276,17 @@ public class TokenRangeReplicaProviderTest assertThat(result.readReplicas().size()).isEqualTo(1); assertThat(result.readReplicas().get(0).replicasByDatacenter().size()).isEqualTo(1); assertThat(result.readReplicas().get(0).replicasByDatacenter().get(TEST_DC1)).containsAll(TEST_ENDPOINTS1); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("128.0.0.1:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("128.0.0.2:7000")).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.1", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "128.0.0.1", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "128.0.0.2", 7000) + .state()).isEqualTo("Normal"); + } @Test @@ -238,6 +302,8 @@ public class TokenRangeReplicaProviderTest ); Map<List<String>, List<String>> pendingRangeToEndpointWithPortMap = Collections.emptyMap(); + when(storageOperations.getLiveNodesWithPort()).thenReturn(TEST_ENDPOINTS1); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(rangeToEndpointWithPortMap); when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)) .thenReturn(pendingRangeToEndpointWithPortMap); @@ -258,9 +324,12 @@ public class TokenRangeReplicaProviderTest Long.toString(Long.MAX_VALUE))).isTrue(); assertThat(validateRangeExists(result.writeReplicas(), "3074457345618258602", Long.toString(Long.MAX_VALUE))).isTrue(); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.1", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Normal"); } @Test @@ -276,7 +345,6 @@ public class TokenRangeReplicaProviderTest ); Map<List<String>, List<String>> pendingRangeToEndpointWithPortMap = ImmutableMap.of( Arrays.asList("6148914691236517204", "9223372036854775807"), -// Arrays.asList("3074457345618258602", "6148914691236517204"), Collections.singletonList("127.0.0.4:7000"), Arrays.asList("-3074457345618258603", "3074457345618258602"), Collections.singletonList("127.0.0.4:7000"), @@ -284,6 +352,9 @@ public class TokenRangeReplicaProviderTest Collections.singletonList("127.0.0.4:7000") ); + when(storageOperations.getLiveNodesWithPort()).thenReturn(TEST_ENDPOINTS1); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); + when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(rangeToEndpointWithPortMap); when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)) .thenReturn(pendingRangeToEndpointWithPortMap); @@ -309,10 +380,14 @@ public class TokenRangeReplicaProviderTest // Existing read replicas wrap-around range ends at "maxToken" assertThat(validateRangeExists(result.readReplicas(), "3074457345618258602", Long.toString(Long.MAX_VALUE))).isTrue(); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.4:7000")).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.1", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.4", 7000) + .state()).isEqualTo("Leaving"); } @Test @@ -330,6 +405,10 @@ public class TokenRangeReplicaProviderTest ); Map<List<String>, List<String>> pendingRangeToEndpointWithPortMap = Collections.emptyMap(); + when(storageOperations.getLiveNodesWithPort()) + .thenReturn(Arrays.asList("127.0.0.1:7000", "127.0.0.2:7000", "127.0.0.3:7000", "127.0.0.4:7000")); + when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList()); + when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(rangeToEndpointWithPortMap); when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)) .thenReturn(pendingRangeToEndpointWithPortMap); @@ -351,10 +430,16 @@ public class TokenRangeReplicaProviderTest Long.toString(Long.MAX_VALUE))).isTrue(); assertThat(validateRangeExists(result.writeReplicas(), "6148914691236517204", Long.toString(Long.MAX_VALUE))).isTrue(); - assertThat(result.replicaState().get("127.0.0.1:7000")).isEqualTo("Leaving"); - assertThat(result.replicaState().get("127.0.0.2:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.3:7000")).isEqualTo("Normal"); - assertThat(result.replicaState().get("127.0.0.4:7000")).isEqualTo("Replacing"); + + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.1", 7000) + .state()).isEqualTo("Leaving"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000) + .state()).isEqualTo("Normal"); + assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.4", 7000) + .state()).isEqualTo("Replacing"); + } private boolean validateRangeExists(List<TokenRangeReplicasResponse.ReplicaInfo> ranges, String start, String end) diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index d8a7f4e..d1e1132 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -328,11 +328,18 @@ abstract class SidecarClientTest public void testTokenRangeReplicasFromReplicaSet() throws Exception { String keyspace = "test"; - String nodeWithPort = "127.0.0.1:7000"; + String nodeAddress = "127.0.0.1"; + int port = 7000; + String nodeWithPort = nodeAddress + ":" + port; String expectedRangeStart = "-9223372036854775808"; String expectedRangeEnd = "9223372036854775807"; - String tokenRangeReplicasAsString = "{\"replicaState\":{" + - "\"127.0.0.1:7000\":\"NORMAL\"}," + + String tokenRangeReplicasAsString = "{\"replicaMetadata\":[{" + + "\"state\":\"Normal\"," + + "\"status\":\"Up\"," + + "\"fqdn\":\"localhost\"," + + "\"address\":\"127.0.0.1\"," + + "\"port\":7000," + + "\"datacenter\":\"datacenter1\"}]," + "\"writeReplicas\":[{\"start\":\"-9223372036854775808\"," + "\"end\":\"9223372036854775807\",\"replicasByDatacenter\":" + "{\"datacenter1\":[\"127.0.0.1:7000\"]}}],\"readReplicas\":" + @@ -356,8 +363,18 @@ abstract class SidecarClientTest assertThat(readReplica.end()).isEqualTo(expectedRangeEnd); assertThat(readReplica.replicasByDatacenter()).containsKey("datacenter1"); assertThat(readReplica.replicasByDatacenter().get("datacenter1")).containsExactly(nodeWithPort); - assertThat(result.replicaState()).hasSize(1); - assertThat(result.replicaState().get(nodeWithPort)).isEqualTo("NORMAL"); + assertThat(result.replicaMetadata()).hasSize(1); + TokenRangeReplicasResponse.ReplicaMetadata instanceMetadata = + result.replicaMetadata().stream() + .filter(r -> r.address().equals(nodeAddress) && r.port() == port) + .findFirst() + .get(); + assertThat(instanceMetadata.state()).isEqualTo("Normal"); + assertThat(instanceMetadata.status()).isEqualTo("Up"); + assertThat(instanceMetadata.address()).isEqualTo("127.0.0.1"); + assertThat(instanceMetadata.port()).isEqualTo(7000); + assertThat(instanceMetadata.fqdn()).isEqualTo("localhost"); + assertThat(instanceMetadata.datacenter()).isEqualTo("datacenter1"); validateResponseServed(ApiEndpointsV1.KEYSPACE_TOKEN_MAPPING_ROUTE.replaceAll( ApiEndpointsV1.KEYSPACE_PATH_PARAM, keyspace)); diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java index 585e729..68911eb 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/TokenRangeReplicasResponse.java @@ -29,33 +29,34 @@ import com.fasterxml.jackson.annotation.JsonProperty; */ public class TokenRangeReplicasResponse { - private final Map<String, String> replicaState; private final List<ReplicaInfo> writeReplicas; private final List<ReplicaInfo> readReplicas; + private final List<ReplicaMetadata> replicaMetadata; + /** * Constructs token range replicas response object with given params. * - * @param replicaState mapping replica to it's state information * @param writeReplicas list of write replicas {@link ReplicaInfo} instances breakdown by token range * @param readReplicas list of read replica {@link ReplicaInfo} instances breakdown by token range + * @param replicaMetadata mapping replica to it's state and status information */ - public TokenRangeReplicasResponse(@JsonProperty("replicaState") Map<String, String> replicaState, - @JsonProperty("writeReplicas") List<ReplicaInfo> writeReplicas, - @JsonProperty("readReplicas") List<ReplicaInfo> readReplicas) + public TokenRangeReplicasResponse(@JsonProperty("writeReplicas") List<ReplicaInfo> writeReplicas, + @JsonProperty("readReplicas") List<ReplicaInfo> readReplicas, + @JsonProperty("replicaMetadata") List<ReplicaMetadata> replicaMetadata) { - this.replicaState = replicaState; this.writeReplicas = writeReplicas; this.readReplicas = readReplicas; + this.replicaMetadata = replicaMetadata; } /** - * @return the replica to state information mapping + * @return metadata associated with each replica */ - @JsonProperty("replicaState") - public Map<String, String> replicaState() + @JsonProperty("replicaMetadata") + public List<ReplicaMetadata> replicaMetadata() { - return replicaState; + return replicaMetadata; } /** @@ -154,4 +155,102 @@ public class TokenRangeReplicasResponse '}'; } } + + /** + * Class representing metadata associated with replica instances + */ + public static class ReplicaMetadata + { + private final String state; + private final String status; + private final String fqdn; + private final String address; + private final int port; + private final String datacenter; + + public ReplicaMetadata(@JsonProperty("state") String state, + @JsonProperty("status") String status, + @JsonProperty("fqdn") String fqdn, + @JsonProperty("address") String address, + @JsonProperty("port") int port, + @JsonProperty("datacenter") String datacenter) + { + this.state = state; + this.status = status; + this.fqdn = fqdn; + this.address = address; + this.port = port; + this.datacenter = datacenter; + } + + /** + * @return the node state. eg. NORMAL, JOINING, LEAVING, etc. + */ + @JsonProperty("state") + public String state() + { + return state; + } + + /** + * @return the node status. eg. UP, DOWN + */ + @JsonProperty("status") + public String status() + { + return status; + } + + /** + * @return FQDN of the node + */ + @JsonProperty("fqdn") + public String fqdn() + { + return fqdn; + } + + /** + * @return IP address of the node + */ + @JsonProperty("address") + public String address() + { + return address; + } + + /** + * @return port number of the node as specified by the replica-set returned + */ + @JsonProperty("port") + public int port() + { + return port; + } + + /** + * @return datacenter address of the node + */ + @JsonProperty("datacenter") + public String datacenter() + { + return datacenter; + } + + + /** + * {@inheritDoc} + */ + public String toString() + { + return "ReplicaMetadata{" + + "state='" + state + '\'' + + ", status='" + status + '\'' + + ", fqdn='" + fqdn + '\'' + + ", address='" + address + '\'' + + ", port='" + port + '\'' + + ", datacenter='" + datacenter + '\'' + + '}'; + } + } } diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java index 0d789c3..1c046b6 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java @@ -89,13 +89,13 @@ public class BaseTokenRangeIntegrationTest extends IntegrationTestBase protected void validateNodeStates(TokenRangeReplicasResponse mappingResponse, Set<String> dcReplication, - Function<Integer, String> statusFunction) + Function<Integer, String> stateFunction) { CassandraIntegrationTest annotation = sidecarTestContext.cassandraTestContext().annotation; int expectedReplicas = (annotation.nodesPerDc() + annotation.newNodesPerDc()) * dcReplication.size(); AbstractCassandraTestContext cassandraTestContext = sidecarTestContext.cassandraTestContext(); - assertThat(mappingResponse.replicaState().size()).isEqualTo(expectedReplicas); + assertThat(mappingResponse.replicaMetadata().size()).isEqualTo(expectedReplicas); for (int i = 1; i <= cassandraTestContext.cluster().size(); i++) { IInstanceConfig config = cassandraTestContext.cluster().get(i).config(); @@ -105,8 +105,11 @@ public class BaseTokenRangeIntegrationTest extends IntegrationTestBase String ipAndPort = config.broadcastAddress().getAddress().getHostAddress() + ":" + config.broadcastAddress().getPort(); - String expectedStatus = statusFunction.apply(i); - assertThat(mappingResponse.replicaState().get(ipAndPort)).isEqualTo(expectedStatus); + String expectedStatus = stateFunction.apply(i); + assertThat(filterReplicaMetadata(mappingResponse.replicaMetadata(), + config.broadcastAddress().getAddress().getHostAddress(), + config.broadcastAddress().getPort()).state()) + .isEqualTo(expectedStatus); } } } @@ -251,6 +254,15 @@ public class BaseTokenRangeIntegrationTest extends IntegrationTestBase } } + private TokenRangeReplicasResponse.ReplicaMetadata filterReplicaMetadata( + List<TokenRangeReplicasResponse.ReplicaMetadata> replicaMetadata, String address, int port) + { + return replicaMetadata.stream() + .filter(r -> (r.address().equals(address) && r.port() == port)) + .findFirst().get(); + } + + void retrieveMappingWithKeyspace(VertxTestContext context, String keyspace, Handler<HttpResponse<Buffer>> verifier) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org