Github user belliottsmith commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219415043
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -338,165 +386,152 @@ else if (useStrictConsistency)
                                                       boolean 
useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> 
isAlive,
                                                       String keyspace,
    -                                                  
Collection<Predicate<Replica>> sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = 
strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> 
snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict 
consistency it's going to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = 
new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end 
up with multiple types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() 
|| r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is 
sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = 
rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going 
to be fetched from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch 
from all of them if they are there
    -                    //With transient replication and strict consistency 
this is to get the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range 
before and who replicates it after
    -                        EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF 
then we have endpoints.
    -                        //So we need to be careful to only be strict when 
endpoints == RF
    -                        if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> 
endpointsStillReplicated = newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints 
based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node 
required to move the data consistently is down: "
    -                                                                + 
oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 
endpoint but found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to 
full and and the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch 
from by address. In that case we can pick any full replica safely
    -                            //since we are already a transient replica and 
the existing replica remains.
    -                            //The old behavior where we might be asked to 
fetch ranges we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we 
need.
    -                            if (oldEndpoints.isEmpty() && 
toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no 
endpoints to fetch from then we must be transitioning from transient to full 
for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = 
sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we 
include a matching node
    -                                Optional<Replica> fullReplica = 
Iterables.<Replica>tryFind(endpointsForRange, and(accept, 
testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = 
Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new 
IllegalStateException("Couldn't find any matching sufficient replica out of " + 
endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to 
see if they will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary 
replicas for strict consistency were removed by source filters: " + 
oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = 
sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, 
and establish everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on 
correctness so no point in fetching from
    -                        //a random full + transient replica since it's 
also likely to lose data
    -                        //Also apply testSourceFilters that were given to 
us so we can safely select a single source
    -                        sources = 
sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters)));
    -                        //Limit it to just the first possible source, we 
don't need more than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 
1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, 
sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", 
toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = 
rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints 
to fetch " + toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one 
losing a range and no one else loses
    -             * from that action (we also don't gain). When we move 
backwards there are two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch 
from two places in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 && 
(addressList.filter(Replica::isFull).size() > 1 || 
addressList.filter(Replica::isTransient).size() > 1))
    -                throw new IllegalStateException(String.format("Multiple 
strict sources found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) 
|| addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data 
consistently is down");
    -                        throw new IllegalStateException("Unable to find 
sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace 
+ " with RF=1. " +
    -                                                        "Ensure this 
keyspace contains replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for 
streaming range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", 
toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data 
consistently is down");
    -                    throw new IllegalStateException("Unable to find 
sufficient sources for streaming range " + toFetch + " in keyspace " + 
keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> 
sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = 
strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, 
endpoints);
    +
    +         //This list of replicas is just candidates. With strict 
consistency it's going to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = 
new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may 
end up with multiple types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> toFetch.isTransient() 
|| r.isFull();
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = 
rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to 
be fetched from to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from 
all of them if they are there
    +                 //With transient replication and strict consistency this 
is to get the full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range 
before and who replicates it after
    +                     EndpointsForRange newEndpoints = 
strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then 
we have endpoints.
    +                     //So we need to be careful to only be strict when 
endpoints == RF
    +                     if (oldEndpoints.size() == 
strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated 
= newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based 
on address
    +                         oldEndpoints = oldEndpoints.filter(r -> 
!endpointsStillReplicated.contains(r.endpoint()));
    --- End diff --
    
    Can call `oldEndpoints.without(endpointsStillReplicated)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to