yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422429361


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,188 @@ public String getLowestCassandraVersion()
         return cassandraVersion;
     }
 
-    public String getVersionFromFeature()
+    @Override
+    public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
     {
-        return null;
+        TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+        Map<RingInstance, InstanceAvailability> result =
+        mapping.getReplicaMetadata()
+               .stream()
+               .map(RingInstance::new)
+               .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+        if (LOGGER.isDebugEnabled())
+        {
+            result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+        }
+        return result;
     }
 
-    protected List<NodeSettings> getAllNodeSettings()
+    private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
     {
-        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                       
TimeUnit.SECONDS);
-
-        if (allNodeSettings.isEmpty())
+        if (!instanceIsUp(instance.getRingInstance()))
         {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     
allNodeSettingFutures.size()));
+            return InstanceAvailability.UNAVAILABLE_DOWN;
         }
-        else if (allNodeSettings.size() < allNodeSettingFutures.size())
+        if (instanceIsBlocked(instance))
         {
-            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-                        allNodeSettings.size(), allNodeSettingFutures.size());
+            return InstanceAvailability.UNAVAILABLE_BLOCKED;
         }
-
-        return allNodeSettings;
-    }
-
-    public String getVersionFromSidecar()
-    {
-        NodeSettings nodeSettings = this.nodeSettings.get();
-        if (nodeSettings != null)
+        if (instanceIsNormal(instance.getRingInstance()) ||
+            instanceIsTransitioning(instance.getRingInstance()) ||
+            instanceIsBeingReplaced(instance.getRingInstance()))
         {
-            return nodeSettings.releaseVersion();
+            return InstanceAvailability.AVAILABLE;
         }
 
-        return getLowestVersion(getAllNodeSettings());
+        LOGGER.info("No valid state found for instance {}", instance);
+        // If it's not one of the above, it's inherently INVALID.
+        return InstanceAvailability.INVALID_STATE;
     }
 
-    protected RingResponse getRingResponse()
+    private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
-        RingResponse currentRingResponse = ringResponse;
-        if (currentRingResponse != null)
-        {
-            return currentRingResponse;
-        }
-
-        synchronized (this)
+        Map<String, Set<String>> writeReplicasByDC;
+        Map<String, Set<String>> pendingReplicasByDC;
+        Map<String, ReplicaMetadata> replicaMetadata;
+        Set<RingInstance> blockedInstances;
+        Set<RingInstance> replacementInstances;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+        try
         {
-            if (ringResponse == null)
+            TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+            replicaMetadata = response.replicaMetadata();
+
+            tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+            LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+                        tokenRangesByInstance.size());
+
+            replacementInstances = response.replicaMetadata()
+                                           .values()
+                                           .stream()
+                                           .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+                                           .map(RingInstance::new)
+                                           .collect(Collectors.toSet());
+
+            blockedInstances = response.replicaMetadata()
+                                       .values()
+                                       .stream()
+                                       .map(RingInstance::new)
+                                       .filter(this::instanceIsBlocked)
+                                       .collect(Collectors.toSet());
+
+            Set<String> blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+                                                     
.collect(Collectors.toSet());
+
+            // Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+            writeReplicasByDC = response.writeReplicas()
+                                        .stream()
+                                        .flatMap(wr -> 
wr.replicasByDatacenter().entrySet().stream())
+                                        
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                                  (l1, l2) -> 
filterAndMergeInstances(l1, l2, blockedIps)));
+
+            pendingReplicasByDC = getPendingReplicas(response, 
writeReplicasByDC);
+
+            if (LOGGER.isDebugEnabled())
             {
-                try
-                {
-                    ringResponse = getCurrentRingResponse();
-                }
-                catch (Exception exception)
-                {
-                    LOGGER.error("Failed to load Cassandra ring", exception);
-                    throw new RuntimeException(exception);
-                }
+                LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                             writeReplicasByDC.keySet(),
+                             
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                             
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
             }
-            return ringResponse;
         }
-    }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges, ", exception);
+            throw new RuntimeException(exception);
+        }
 
-    private RingResponse getCurrentRingResponse() throws Exception
-    {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        // Include availability info so CL checks can use it to exclude 
replacement hosts
+        return new TokenRangeMapping<>(getPartitioner(),
+                                       getReplicationFactor(),
+                                       writeReplicasByDC,
+                                       pendingReplicasByDC,
+                                       tokenRangesByInstance,
+                                       new 
ArrayList<>(replicaMetadata.values()),
+                                       blockedInstances,
+                                       replacementInstances);
     }
 
-    private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
+    private Set<String> filterAndMergeInstances(Set<String> instancesList1, 
Set<String> instancesList2, Set<String> blockedIPs)
     {
-        return ringResponse.stream()
-                           .map(RingInstance::new)
-                           .collect(Collectors.toList());
+        Set<String> merged = new HashSet<>();
+        // Removes blocked instances. If this is included, remove 
blockedInstances from CL checks
+        merged.addAll(instancesList1.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+        merged.addAll(instancesList2.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+        return merged;
     }
 
-    private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+    // Pending replicas are currently calculated by extracting the 
non-read-replicas from the write-replica-set
+    // This will be replaced by the instance state metadata when it is 
supported by the token-ranges API
+    private Map<String, Set<String>> 
getPendingReplicas(TokenRangeReplicasResponse response, Map<String, 
Set<String>> writeReplicasByDC)
     {
-        return new RingInstance(ringEntry);
+        Set<String> readReplicas = 
readReplicasFromTokenRangeResponse(response);
+        return writeReplicasByDC.entrySet()
+                                .stream()
+                                .filter(entry -> 
entry.getValue().stream().noneMatch(readReplicas::contains))
+                                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 
-    protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+    private Multimap<RingInstance, Range<BigInteger>> 
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+                                                                               
Map<String, ReplicaMetadata> replicaMetadata)
     {
-        GossipInfoResponse currentGossipInfo = gossipInfo;
-        if (!forceRefresh && currentGossipInfo != null)
-        {
-            return currentGossipInfo;
-        }
-
-        synchronized (this)
+        Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = 
ArrayListMultimap.create();
+        for (ReplicaInfo rInfo : writeReplicas)
         {
-            if (forceRefresh || gossipInfo == null)
+            Range<BigInteger> range = Range.openClosed(new 
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+            for (Map.Entry<String, List<String>> dcReplicaEntry : 
rInfo.replicasByDatacenter().entrySet())
             {
-                try
-                {
-                    gossipInfo = 
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-                                                                               
       TimeUnit.MILLISECONDS);
-                }
-                catch (ExecutionException | InterruptedException exception)
-                {
-                    LOGGER.error("Failed to retrieve gossip information");
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
-                catch (TimeoutException exception)
-                {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
+                // For each writeReplica, get metadata and update map to 
include range
+                dcReplicaEntry.getValue().forEach(ipAddress -> {
+                    // Get metadata for this IP; Create RingInstance
+                    ReplicaMetadata replica = replicaMetadata.get(ipAddress);
+                    instanceToRangeMap.put(new RingInstance(replica), range);

Review Comment:
   Thanks for adding the check!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to