frankgh commented on code in PR #78: URL: https://github.com/apache/cassandra-analytics/pull/78#discussion_r1747575920
########## scripts/build-dtest-jars.sh: ########## @@ -118,4 +118,6 @@ else exit ${RETURN} fi done + # always delete the Cassandra source after dtest.jar is built to avoid confusing IDE Review Comment: can we alternatively exclude these files from the IDE's indexing? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java: ########## @@ -22,55 +22,131 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo; import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata; -import org.apache.cassandra.spark.bulkwriter.RingInstance; import org.apache.cassandra.spark.common.model.CassandraInstance; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.jetbrains.annotations.Nullable; -// TODO: refactor to improve the return types of methods to use `Instance` instead of String and cleanup -public class TokenRangeMapping<Instance extends CassandraInstance> implements Serializable +public class TokenRangeMapping<I extends CassandraInstance> implements Serializable { private static final long serialVersionUID = -7284933683815811160L; + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeMapping.class); + private final Partitioner partitioner; private final ReplicationFactor replicationFactor; - private final transient Set<RingInstance> replacementInstances; - private final transient RangeMap<BigInteger, List<Instance>> replicasByTokenRange; - private final transient Multimap<Instance, Range<BigInteger>> tokenRangeMap; - private final transient Map<String, Set<String>> writeReplicasByDC; - private final transient Map<String, Set<String>> pendingReplicasByDC; - private final transient List<ReplicaMetadata> replicaMetadata; + private final transient Set<I> allInstances; + private final transient RangeMap<BigInteger, List<I>> replicasByTokenRange; + private final transient Multimap<I, Range<BigInteger>> tokenRangeMap; + private final transient Map<String, Set<I>> writeReplicasByDC; + private final transient Map<String, Set<I>> pendingReplicasByDC; + + public static <I extends CassandraInstance> + TokenRangeMapping<I> create(Supplier<TokenRangeReplicasResponse> topologySupplier, + Supplier<Partitioner> partitionerSupplier, + Supplier<ReplicationFactor> replicationFactorSupplier, + Function<ReplicaMetadata, I> instanceCreator) + { + TokenRangeReplicasResponse response = topologySupplier.get(); + Map<String, I> instanceByIpAddress = new HashMap<>(response.replicaMetadata().size()); + response.replicaMetadata() + .forEach((ipAddress, metadata) -> instanceByIpAddress.put(ipAddress, instanceCreator.apply(metadata))); + + Multimap<I, Range<BigInteger>> tokenRangesByInstance = tokenRangesByInstance(response.writeReplicas(), + instanceByIpAddress); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + Map<String, Set<I>> writeReplicasByDC = new HashMap<>(); + Map<String, Set<I>> pendingReplicasByDC = new HashMap<>(); + Set<I> allInstances = new HashSet<>(instanceByIpAddress.values()); + for (I instance : allInstances) + { + Set<I> dc = writeReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>()); + dc.add(instance); + if (instance.nodeState().isPending) + { + Set<I> pendingInDc = pendingReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>()); + pendingInDc.add(instance); + } + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); + } + + Map<String, ReplicaMetadata> replicaMetadata = response.replicaMetadata(); Review Comment: unused? ########## scripts/build-sidecar.sh: ########## @@ -77,4 +77,6 @@ else fi git clean -fd ./gradlew -Pversion=${SIDECAR_BUILD_VERSION} -Dmaven.repo.local=${SIDECAR_JAR_DIR} publishToMavenLocal + # Delete sidecar source after publishing to avoid confusing IDE + rm -rf "${SIDECAR_BUILD_DIR}" Review Comment: can we alternatively exclude these files from the IDE's indexing? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java: ########## @@ -22,55 +22,131 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo; import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata; -import org.apache.cassandra.spark.bulkwriter.RingInstance; import org.apache.cassandra.spark.common.model.CassandraInstance; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.jetbrains.annotations.Nullable; -// TODO: refactor to improve the return types of methods to use `Instance` instead of String and cleanup -public class TokenRangeMapping<Instance extends CassandraInstance> implements Serializable +public class TokenRangeMapping<I extends CassandraInstance> implements Serializable { private static final long serialVersionUID = -7284933683815811160L; + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeMapping.class); + private final Partitioner partitioner; private final ReplicationFactor replicationFactor; - private final transient Set<RingInstance> replacementInstances; - private final transient RangeMap<BigInteger, List<Instance>> replicasByTokenRange; - private final transient Multimap<Instance, Range<BigInteger>> tokenRangeMap; - private final transient Map<String, Set<String>> writeReplicasByDC; - private final transient Map<String, Set<String>> pendingReplicasByDC; - private final transient List<ReplicaMetadata> replicaMetadata; + private final transient Set<I> allInstances; + private final transient RangeMap<BigInteger, List<I>> replicasByTokenRange; + private final transient Multimap<I, Range<BigInteger>> tokenRangeMap; + private final transient Map<String, Set<I>> writeReplicasByDC; + private final transient Map<String, Set<I>> pendingReplicasByDC; Review Comment: I don't think it's worth keeping the replicas by DC, we always end up flattening. Why not just keep the set instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org