arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422004520
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API Review Comment: Correct. Updated to utilize metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org