Updated Branches:
  refs/heads/trunk b7a8b57ca -> f16becfc2

Bootstrapping to handle more failure
patch by Vijay; reviewed by Brandon Williams for CASSANDRA-3555


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f16becfc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f16becfc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f16becfc

Branch: refs/heads/trunk
Commit: f16becfc29ef7e8bbf5f92913be51e33a99ba537
Parents: b7a8b57
Author: Vijay Parthasarathy <vijay2...@gmail.com>
Authored: Thu Mar 8 16:41:07 2012 -0800
Committer: Vijay Parthasarathy <vijay2...@gmail.com>
Committed: Thu Mar 8 16:41:07 2012 -0800

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/BootStrapper.java     |    2 +-
 .../org/apache/cassandra/dht/RangeStreamer.java    |   64 ++++++++++++++-
 .../org/apache/cassandra/dht/BootStrapperTest.java |   21 +++++-
 3 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java 
b/src/java/org/apache/cassandra/dht/BootStrapper.java
index de7f0cf..ae5b95e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -117,7 +117,7 @@ public class BootStrapper
         List<InetAddress> endpoints = new ArrayList<InetAddress>(load.size());
         for (InetAddress endpoint : load.keySet())
         {
-            if (!metadata.isMember(endpoint))
+            if (!metadata.isMember(endpoint) || 
!FailureDetector.instance.isAlive(endpoint))
                 continue;
             endpoints.add(endpoint);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 5c55f5a..078851c 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -30,8 +30,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -42,7 +48,7 @@ import org.apache.cassandra.utils.FBUtilities;
 /**
  * Assists in streaming ranges to a node.
  */
-public class RangeStreamer
+public class RangeStreamer implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RangeStreamer.class);
 
@@ -51,6 +57,9 @@ public class RangeStreamer
     private final OperationType opType;
     private final Multimap<String, Map.Entry<InetAddress, 
Collection<Range<Token>>>> toFetch = HashMultimap.create();
     private final Set<ISourceFilter> sourceFilters = new 
HashSet<ISourceFilter>();
+    // protected for testing.
+    protected CountDownLatch latch;
+    protected volatile String exceptionMessage = null;
 
     /**
      * A filter applied to sources to stream from when constructing a fetch 
map.
@@ -212,7 +221,7 @@ public class RangeStreamer
 
     public void fetch()
     {
-        final CountDownLatch latch = new 
CountDownLatch(toFetch().entries().size());
+        latch = new CountDownLatch(toFetch().entries().size());
         for (Map.Entry<String, Map.Entry<InetAddress, 
Collection<Range<Token>>>> entry : toFetch.entries())
         {
             final String table = entry.getKey();
@@ -234,13 +243,64 @@ public class RangeStreamer
             StreamIn.requestRanges(source, table, ranges, callback, opType);
         }
 
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
+        Gossiper.instance.register(this);
         try
         {
             latch.await();
+            if (exceptionMessage != null)
+                throw new RuntimeException(exceptionMessage);
         }
         catch (InterruptedException e)
         {
             throw new AssertionError(e);
         }
+        finally
+        {
+            
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+            Gossiper.instance.unregister(this);
+        }
+    }
+    
+    @Override
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+
+    @Override
+    public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value) {}
+
+    @Override
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+
+    @Override
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+    
+    @Override
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    @Override
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {        
+        // We want a higher confidence in the failure detection than usual 
because failing a repair wrongly has a high cost.
+        // same logic as in RepairSession
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        for (Map.Entry<InetAddress, Collection<Range<Token>>> value: 
toFetch().values())
+        {
+            if (value.getKey().equals(endpoint))
+            {
+                exceptionMessage = String.format("Node: %s died while 
streaming the ranges. Boostrap/rebuild Aborded.", endpoint);
+                while (latch.getCount() > 0)
+                    latch.countDown();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java 
b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 4b3a00d..65a6a5c 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.Test;
 
@@ -33,6 +34,7 @@ import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -40,6 +42,8 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.junit.Assert.*;
+
 import static org.junit.Assert.assertEquals;
 
 public class BootStrapperTest extends CleanupHelper
@@ -73,7 +77,10 @@ public class BootStrapperTest extends CleanupHelper
         };
         Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
         for (int i = 0; i < addrs.length; i++)
+        {
+            Gossiper.instance.initializeNodeUnsafe(addrs[i], 1);
             load.put(addrs[i], (double)i+2);
+        }
 
         // give every node a bootstrap source.
         for (int i = 3; i >=0; i--)
@@ -155,7 +162,7 @@ public class BootStrapperTest extends CleanupHelper
         }
     }
 
-    private void testSourceTargetComputation(String table, int numOldNodes, 
int replicationFactor) throws UnknownHostException
+    private RangeStreamer testSourceTargetComputation(String table, int 
numOldNodes, int replicationFactor) throws UnknownHostException
     {
         StorageService ss = StorageService.instance;
 
@@ -196,6 +203,18 @@ public class BootStrapperTest extends CleanupHelper
         // is used, they will vary.
         assert toFetch.iterator().next().getValue().size() > 0;
         assert !toFetch.iterator().next().getKey().equals(myEndpoint);
+        return s;
+    }
+
+    @Test
+    public void testException() throws UnknownHostException
+    {
+        String table = Schema.instance.getNonSystemTables().iterator().next();
+        int replicationFactor = 
Table.open(table).getReplicationStrategy().getReplicationFactor();
+        RangeStreamer streamer = testSourceTargetComputation(table, 
replicationFactor, replicationFactor);
+        streamer.latch = new CountDownLatch(4);
+        
streamer.convict(streamer.toFetch().get(table).iterator().next().getKey(), 
Double.MAX_VALUE);
+        assertNotNull("Exception message not set, test failed", 
streamer.exceptionMessage);
     }
 
     private void generateFakeEndpoints(int numOldNodes) throws 
UnknownHostException

Reply via email to