frankgh commented on code in PR #78:
URL: 
https://github.com/apache/cassandra-analytics/pull/78#discussion_r1747575920


##########
scripts/build-dtest-jars.sh:
##########
@@ -118,4 +118,6 @@ else
         exit ${RETURN}
     fi
   done
+  # always delete the Cassandra source after dtest.jar is built to avoid 
confusing IDE

Review Comment:
   can we alternatively exclude these files from the IDE's indexing?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java:
##########
@@ -22,55 +22,131 @@
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
 import com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
 import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
-import org.apache.cassandra.spark.bulkwriter.RingInstance;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.jetbrains.annotations.Nullable;
 
-// TODO: refactor to improve the return types of methods to use `Instance` 
instead of String and cleanup
-public class TokenRangeMapping<Instance extends CassandraInstance> implements 
Serializable
+public class TokenRangeMapping<I extends CassandraInstance> implements 
Serializable
 {
     private static final long serialVersionUID = -7284933683815811160L;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeMapping.class);
+
     private final Partitioner partitioner;
     private final ReplicationFactor replicationFactor;
-    private final transient Set<RingInstance> replacementInstances;
-    private final transient RangeMap<BigInteger, List<Instance>> 
replicasByTokenRange;
-    private final transient Multimap<Instance, Range<BigInteger>> 
tokenRangeMap;
-    private final transient Map<String, Set<String>> writeReplicasByDC;
-    private final transient Map<String, Set<String>> pendingReplicasByDC;
-    private final transient List<ReplicaMetadata> replicaMetadata;
+    private final transient Set<I> allInstances;
+    private final transient RangeMap<BigInteger, List<I>> replicasByTokenRange;
+    private final transient Multimap<I, Range<BigInteger>> tokenRangeMap;
+    private final transient Map<String, Set<I>> writeReplicasByDC;
+    private final transient Map<String, Set<I>> pendingReplicasByDC;
+
+    public static <I extends CassandraInstance>
+    TokenRangeMapping<I> create(Supplier<TokenRangeReplicasResponse> 
topologySupplier,
+                                Supplier<Partitioner> partitionerSupplier,
+                                Supplier<ReplicationFactor> 
replicationFactorSupplier,
+                                Function<ReplicaMetadata, I> instanceCreator)
+    {
+        TokenRangeReplicasResponse response = topologySupplier.get();
+        Map<String, I> instanceByIpAddress = new 
HashMap<>(response.replicaMetadata().size());
+        response.replicaMetadata()
+                .forEach((ipAddress, metadata) -> 
instanceByIpAddress.put(ipAddress, instanceCreator.apply(metadata)));
+
+        Multimap<I, Range<BigInteger>> tokenRangesByInstance = 
tokenRangesByInstance(response.writeReplicas(),
+                                                                               
      instanceByIpAddress);
+
+        // Each token range has hosts by DC. We collate them across all ranges 
into all hosts by DC
+        Map<String, Set<I>> writeReplicasByDC = new HashMap<>();
+        Map<String, Set<I>> pendingReplicasByDC = new HashMap<>();
+        Set<I> allInstances = new HashSet<>(instanceByIpAddress.values());
+        for (I instance : allInstances)
+        {
+            Set<I> dc = 
writeReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>());
+            dc.add(instance);
+            if (instance.nodeState().isPending)
+            {
+                Set<I> pendingInDc = 
pendingReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new 
HashSet<>());
+                pendingInDc.add(instance);
+            }
+        }
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                         writeReplicasByDC.keySet(),
+                         
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                         
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
+        }
+
+        Map<String, ReplicaMetadata> replicaMetadata = 
response.replicaMetadata();

Review Comment:
   unused?



##########
scripts/build-sidecar.sh:
##########
@@ -77,4 +77,6 @@ else
   fi
   git clean -fd
   ./gradlew -Pversion=${SIDECAR_BUILD_VERSION} 
-Dmaven.repo.local=${SIDECAR_JAR_DIR} publishToMavenLocal
+  # Delete sidecar source after publishing to avoid confusing IDE
+  rm -rf "${SIDECAR_BUILD_DIR}"

Review Comment:
   can we alternatively exclude these files from the IDE's indexing?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java:
##########
@@ -22,55 +22,131 @@
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
 import com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
 import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
-import org.apache.cassandra.spark.bulkwriter.RingInstance;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.jetbrains.annotations.Nullable;
 
-// TODO: refactor to improve the return types of methods to use `Instance` 
instead of String and cleanup
-public class TokenRangeMapping<Instance extends CassandraInstance> implements 
Serializable
+public class TokenRangeMapping<I extends CassandraInstance> implements 
Serializable
 {
     private static final long serialVersionUID = -7284933683815811160L;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeMapping.class);
+
     private final Partitioner partitioner;
     private final ReplicationFactor replicationFactor;
-    private final transient Set<RingInstance> replacementInstances;
-    private final transient RangeMap<BigInteger, List<Instance>> 
replicasByTokenRange;
-    private final transient Multimap<Instance, Range<BigInteger>> 
tokenRangeMap;
-    private final transient Map<String, Set<String>> writeReplicasByDC;
-    private final transient Map<String, Set<String>> pendingReplicasByDC;
-    private final transient List<ReplicaMetadata> replicaMetadata;
+    private final transient Set<I> allInstances;
+    private final transient RangeMap<BigInteger, List<I>> replicasByTokenRange;
+    private final transient Multimap<I, Range<BigInteger>> tokenRangeMap;
+    private final transient Map<String, Set<I>> writeReplicasByDC;
+    private final transient Map<String, Set<I>> pendingReplicasByDC;

Review Comment:
   I don't think it's worth keeping the replicas by DC, we always end up 
flattening. Why not just keep the set instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to