arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422005002
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API + private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC) { - return new RingInstance(ringEntry); + Set<String> readReplicas = readReplicasFromTokenRangeResponse(response); + return writeReplicasByDC.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().noneMatch(readReplicas::contains)) Review Comment: You're right. This will go away per previous comment ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,188 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) - { - return currentRingResponse; - } - - synchronized (this) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + Map<String, ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - if (ringResponse == null) + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); + + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .values() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata() + .values() + .stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + new ArrayList<>(replicaMetadata.values()), + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API + private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC) { - return new RingInstance(ringEntry); + Set<String> readReplicas = readReplicasFromTokenRangeResponse(response); + return writeReplicasByDC.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().noneMatch(readReplicas::contains)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - protected GossipInfoResponse getGossipInfo(boolean forceRefresh) + private Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas, + Map<String, ReplicaMetadata> replicaMetadata) { - GossipInfoResponse currentGossipInfo = gossipInfo; - if (!forceRefresh && currentGossipInfo != null) - { - return currentGossipInfo; - } - - synchronized (this) + Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = ArrayListMultimap.create(); + for (ReplicaInfo rInfo : writeReplicas) { - if (forceRefresh || gossipInfo == null) + Range<BigInteger> range = Range.openClosed(new BigInteger(rInfo.start()), new BigInteger(rInfo.end())); + for (Map.Entry<String, List<String>> dcReplicaEntry : rInfo.replicasByDatacenter().entrySet()) { - try - { - gossipInfo = cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(), - TimeUnit.MILLISECONDS); - } - catch (ExecutionException | InterruptedException exception) - { - LOGGER.error("Failed to retrieve gossip information"); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } - catch (TimeoutException exception) - { - Thread.currentThread().interrupt(); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } + // For each writeReplica, get metadata and update map to include range + dcReplicaEntry.getValue().forEach(ipAddress -> { + // Get metadata for this IP; Create RingInstance + ReplicaMetadata replica = replicaMetadata.get(ipAddress); + instanceToRangeMap.put(new RingInstance(replica), range); Review Comment: This is checked right before it is used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org