Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/269#discussion_r219415043 --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java --- @@ -338,165 +386,152 @@ else if (useStrictConsistency) boolean useStrictConsistency, TokenMetadata tmdBefore, TokenMetadata tmdAfter, - Predicate<Replica> isAlive, String keyspace, - Collection<Predicate<Replica>> sourceFilters) - { - EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); - - InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); - logger.debug ("Keyspace: {}", keyspace); - logger.debug("To fetch RN: {}", fetchRanges); - logger.debug("Fetch ranges: {}", rangeAddresses); - - Predicate<Replica> testSourceFilters = and(sourceFilters); - Function<EndpointsForRange, EndpointsForRange> sorted = - endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); - - //This list of replicas is just candidates. With strict consistency it's going to be a narrow list. - EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); - for (Replica toFetch : fetchRanges) - { - //Replica that is sufficient to provide the data we need - //With strict consistency and transient replication we may end up with multiple types - //so this isn't used with strict consistency - Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull()); - Predicate<Replica> accept = r -> - isSufficient.test(r) // is sufficient - && !r.endpoint().equals(localAddress) // is not self - && isAlive.test(r); // is alive - - logger.debug("To fetch {}", toFetch); - for (Range<Token> range : rangeAddresses.keySet()) - { - if (range.contains(toFetch.range())) - { - EndpointsForRange oldEndpoints = rangeAddresses.get(range); - - //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch - //It could be multiple endpoints and we must fetch from all of them if they are there - //With transient replication and strict consistency this is to get the full data from a full replica and - //transient data from the transient replica losing data - EndpointsForRange sources; - if (useStrictConsistency) - { - //Start with two sets of who replicates the range before and who replicates it after - EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); - logger.debug("Old endpoints {}", oldEndpoints); - logger.debug("New endpoints {}", newEndpoints); - - //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - //So we need to be careful to only be strict when endpoints == RF - if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) - { - Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints(); - // Remove new endpoints from old endpoints based on address - oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); - - if (!all(oldEndpoints, isAlive)) - throw new IllegalStateException("A node required to move the data consistently is down: " - + oldEndpoints.filter(not(isAlive))); - - if (oldEndpoints.size() > 1) - throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); - - //If we are transitioning from transient to full and and the set of replicas for the range is not changing - //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely - //since we are already a transient replica and the existing replica remains. - //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore. - //So it's an error if we don't find what we need. - if (oldEndpoints.isEmpty() && toFetch.isTransient()) - { - throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch); - } - - if (!any(oldEndpoints, isSufficient)) - { - // need an additional replica - EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range)); - // include all our filters, to ensure we include a matching node - Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil(); - if (fullReplica.isPresent()) - oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get())); - else - throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange); - } - - //We have to check the source filters here to see if they will remove any replicas - //required for strict consistency - if (!all(oldEndpoints, testSourceFilters)) - throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters))); - } - else - { - oldEndpoints = sorted.apply(oldEndpoints.filter(accept)); - } - - //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case - sources = oldEndpoints.filter(testSourceFilters); - } - else - { - //Without strict consistency we have given up on correctness so no point in fetching from - //a random full + transient replica since it's also likely to lose data - //Also apply testSourceFilters that were given to us so we can safely select a single source - sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters))); - //Limit it to just the first possible source, we don't need more than one and downstream - //will fetch from every source we supply - sources = sources.size() > 0 ? sources.subList(0, 1) : sources; - } - - // storing range and preferred endpoint set - rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE); - logger.debug("Endpoints to fetch for {} are {}", toFetch, sources); - } - } - - EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch); - if (addressList == null) - throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch); - - /* - * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses - * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica - * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain. - * For a transient range we only need to fetch from one. - */ - if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1)) - throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList)); - - //We must have enough stuff to fetch from - if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty()) - { - if (strat.getReplicationFactor().allReplicas == 1) - { - if (useStrictConsistency) - { - logger.warn("A node required to move the data consistently is down"); - throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " + - "Ensure this keyspace contains replicas in the source datacenter."); - } - else - logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " + - "Keyspace might be missing data.", toFetch, keyspace); - - } - else - { - if (useStrictConsistency) - logger.warn("A node required to move the data consistently is down"); - throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace); - } - } - } - return rangesToFetchWithPreferredEndpoints.asImmutableView(); - } + Collection<SourceFilter> sourceFilters) + { + EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); + + InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + logger.debug ("Keyspace: {}", keyspace); + logger.debug("To fetch RN: {}", fetchRanges); + logger.debug("Fetch ranges: {}", rangeAddresses); + + Predicate<Replica> testSourceFilters = and(sourceFilters); + Function<EndpointsForRange, EndpointsForRange> sorted = + endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); + + //This list of replicas is just candidates. With strict consistency it's going to be a narrow list. + EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); + for (Replica toFetch : fetchRanges) + { + //Replica that is sufficient to provide the data we need + //With strict consistency and transient replication we may end up with multiple types + //so this isn't used with strict consistency + Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull(); + + logger.debug("To fetch {}", toFetch); + for (Range<Token> range : rangeAddresses.keySet()) + { + if (!range.contains(toFetch.range())) + continue; + + EndpointsForRange oldEndpoints = rangeAddresses.get(range); + + //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch + //It could be multiple endpoints and we must fetch from all of them if they are there + //With transient replication and strict consistency this is to get the full data from a full replica and + //transient data from the transient replica losing data + EndpointsForRange sources; + if (useStrictConsistency) + { + //Start with two sets of who replicates the range before and who replicates it after + EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); + logger.debug("Old endpoints {}", oldEndpoints); + logger.debug("New endpoints {}", newEndpoints); + + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) + { + Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints(); + // Remove new endpoints from old endpoints based on address + oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); --- End diff -- Can call `oldEndpoints.without(endpointsStillReplicated)`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org