Updated Branches: refs/heads/trunk b7a8b57ca -> f16becfc2
Bootstrapping to handle more failure patch by Vijay; reviewed by Brandon Williams for CASSANDRA-3555 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f16becfc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f16becfc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f16becfc Branch: refs/heads/trunk Commit: f16becfc29ef7e8bbf5f92913be51e33a99ba537 Parents: b7a8b57 Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Thu Mar 8 16:41:07 2012 -0800 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Thu Mar 8 16:41:07 2012 -0800 ---------------------------------------------------------------------- .../org/apache/cassandra/dht/BootStrapper.java | 2 +- .../org/apache/cassandra/dht/RangeStreamer.java | 64 ++++++++++++++- .../org/apache/cassandra/dht/BootStrapperTest.java | 21 +++++- 3 files changed, 83 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index de7f0cf..ae5b95e 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -117,7 +117,7 @@ public class BootStrapper List<InetAddress> endpoints = new ArrayList<InetAddress>(load.size()); for (InetAddress endpoint : load.keySet()) { - if (!metadata.isMember(endpoint)) + if (!metadata.isMember(endpoint) || !FailureDetector.instance.isAlive(endpoint)) continue; endpoints.add(endpoint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 5c55f5a..078851c 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -30,8 +30,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Table; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.TokenMetadata; @@ -42,7 +48,7 @@ import org.apache.cassandra.utils.FBUtilities; /** * Assists in streaming ranges to a node. */ -public class RangeStreamer +public class RangeStreamer implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class); @@ -51,6 +57,9 @@ public class RangeStreamer private final OperationType opType; private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create(); private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>(); + // protected for testing. + protected CountDownLatch latch; + protected volatile String exceptionMessage = null; /** * A filter applied to sources to stream from when constructing a fetch map. @@ -212,7 +221,7 @@ public class RangeStreamer public void fetch() { - final CountDownLatch latch = new CountDownLatch(toFetch().entries().size()); + latch = new CountDownLatch(toFetch().entries().size()); for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries()) { final String table = entry.getKey(); @@ -234,13 +243,64 @@ public class RangeStreamer StreamIn.requestRanges(source, table, ranges, callback, opType); } + FailureDetector.instance.registerFailureDetectionEventListener(this); + Gossiper.instance.register(this); try { latch.await(); + if (exceptionMessage != null) + throw new RuntimeException(exceptionMessage); } catch (InterruptedException e) { throw new AssertionError(e); } + finally + { + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + Gossiper.instance.unregister(this); + } + } + + @Override + public void onJoin(InetAddress endpoint, EndpointState epState) {} + + @Override + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + + @Override + public void onAlive(InetAddress endpoint, EndpointState state) {} + + @Override + public void onDead(InetAddress endpoint, EndpointState state) {} + + @Override + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + @Override + public void onRestart(InetAddress endpoint, EndpointState epState) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void convict(InetAddress endpoint, double phi) + { + // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. + // same logic as in RepairSession + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) + return; + + for (Map.Entry<InetAddress, Collection<Range<Token>>> value: toFetch().values()) + { + if (value.getKey().equals(endpoint)) + { + exceptionMessage = String.format("Node: %s died while streaming the ranges. Boostrap/rebuild Aborded.", endpoint); + while (latch.getCount() > 0) + latch.countDown(); + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 4b3a00d..65a6a5c 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.HashMap; import java.util.Set; import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.junit.Test; @@ -33,6 +34,7 @@ import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Table; import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.TokenMetadata; @@ -40,6 +42,8 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.OperationType; import org.apache.cassandra.utils.FBUtilities; +import static org.junit.Assert.*; + import static org.junit.Assert.assertEquals; public class BootStrapperTest extends CleanupHelper @@ -73,7 +77,10 @@ public class BootStrapperTest extends CleanupHelper }; Map<InetAddress, Double> load = new HashMap<InetAddress, Double>(); for (int i = 0; i < addrs.length; i++) + { + Gossiper.instance.initializeNodeUnsafe(addrs[i], 1); load.put(addrs[i], (double)i+2); + } // give every node a bootstrap source. for (int i = 3; i >=0; i--) @@ -155,7 +162,7 @@ public class BootStrapperTest extends CleanupHelper } } - private void testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException + private RangeStreamer testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException { StorageService ss = StorageService.instance; @@ -196,6 +203,18 @@ public class BootStrapperTest extends CleanupHelper // is used, they will vary. assert toFetch.iterator().next().getValue().size() > 0; assert !toFetch.iterator().next().getKey().equals(myEndpoint); + return s; + } + + @Test + public void testException() throws UnknownHostException + { + String table = Schema.instance.getNonSystemTables().iterator().next(); + int replicationFactor = Table.open(table).getReplicationStrategy().getReplicationFactor(); + RangeStreamer streamer = testSourceTargetComputation(table, replicationFactor, replicationFactor); + streamer.latch = new CountDownLatch(4); + streamer.convict(streamer.toFetch().get(table).iterator().next().getKey(), Double.MAX_VALUE); + assertNotNull("Exception message not set, test failed", streamer.exceptionMessage); } private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException