RangeStreamer throws when any range did not complete. Patch by brandonwilliams reviewed by yukim for CASSANDRA-5009
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d1d37203 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d1d37203 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d1d37203 Branch: refs/heads/cassandra-1.2.0 Commit: d1d372033b2f0a2ef0a63c9591b369d7789851db Parents: 3706749 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Dec 3 14:21:23 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Dec 3 14:21:23 2012 -0600 ---------------------------------------------------------------------- .../org/apache/cassandra/dht/RangeStreamer.java | 23 +++++++++++---- 1 files changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1d37203/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 4e5cfb8..a0e1a93 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,12 +19,14 @@ package org.apache.cassandra.dht; import java.net.InetAddress; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.cassandra.streaming.IStreamCallback; +import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +40,6 @@ import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.streaming.OperationType; import org.apache.cassandra.streaming.StreamIn; -import org.apache.cassandra.utils.FBUtilities; /** * Assists in streaming ranges to a node. @@ -52,6 +53,9 @@ public class RangeStreamer private final OperationType opType; private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create(); private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>(); + // protected for testing. + protected CountDownLatch latch; + private Set<Range<Token>> completed = Collections.newSetFromMap(new ConcurrentHashMap<Range<Token>, Boolean>()); /** * A filter applied to sources to stream from when constructing a fetch map. @@ -140,7 +144,7 @@ public class RangeStreamer private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String table, Collection<Range<Token>> desiredRanges) { AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy(); - Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata); + Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap()); Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create(); for (Range<Token> desiredRange : desiredRanges) @@ -217,17 +221,19 @@ public class RangeStreamer public void fetch() { - final CountDownLatch latch = new CountDownLatch(toFetch().entries().size()); + latch = new CountDownLatch(toFetch.entries().size()); + for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries()) { final String table = entry.getKey(); final InetAddress source = entry.getValue().getKey(); - Collection<Range<Token>> ranges = entry.getValue().getValue(); + final Collection<Range<Token>> ranges = entry.getValue().getValue(); /* Send messages to respective folks to stream data over to me */ IStreamCallback callback = new IStreamCallback() { public void onSuccess() { + completed.addAll(ranges); latch.countDown(); if (logger.isDebugEnabled()) logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s", @@ -236,8 +242,8 @@ public class RangeStreamer public void onFailure() { + latch.countDown(); logger.warn("Streaming from " + source + " failed"); - onSuccess(); // calling onSuccess for latch countdown } }; if (logger.isDebugEnabled()) @@ -248,6 +254,11 @@ public class RangeStreamer try { latch.await(); + for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries()) + { + if (!completed.containsAll(entry.getValue().getValue())) + throw new RuntimeException(String.format("Unable to fetch range %s for keyspace %s from any hosts", entry.getValue().getValue(), entry.getKey())); + } } catch (InterruptedException e) {