Author: brandonwilliams
Date: Thu Sep 30 21:03:24 2010
New Revision: 1003265

URL: http://svn.apache.org/viewvc?rev=1003265&view=rev
Log:
Persist ring state between restarts.  Patch by brandonwilliams and jbellis, 
reviewed by brandonwilliams for CASSANDRA-1518

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1003265&r1=1003264&r2=1003265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Sep 
30 21:03:24 2010
@@ -23,7 +23,8 @@ import java.io.FilenameFilter;
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Collection;
+import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
@@ -32,11 +33,8 @@ import org.apache.commons.lang.ArrayUtil
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.clock.TimestampReconciler;
-import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -53,6 +51,7 @@ public class SystemTable
     public static final String STATUS_CF = "LocationInfo"; // keep the old CF 
string for backwards-compatibility
     public static final String INDEX_CF = "IndexInfo";
     private static final byte[] LOCATION_KEY = "L".getBytes(UTF_8);
+    private static final byte[] RING_KEY = "Ring".getBytes(UTF_8);
     private static final byte[] BOOTSTRAP_KEY = "Bootstrap".getBytes(UTF_8);
     private static final byte[] COOKIE_KEY = "Cookies".getBytes(UTF_8);
     private static final byte[] BOOTSTRAP = "B".getBytes(UTF_8);
@@ -92,8 +91,8 @@ public class SystemTable
     {
         IPartitioner p = StorageService.getPartitioner();
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
-        cf.addColumn(new Column(ep.getAddress(), 
p.getTokenFactory().toByteArray(token), new 
TimestampClock(System.currentTimeMillis())));
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
+        cf.addColumn(new Column(p.getTokenFactory().toByteArray(token), 
ep.getAddress(), new TimestampClock(System.currentTimeMillis())));
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, RING_KEY);
         rm.add(cf);
         try
         {
@@ -106,6 +105,24 @@ public class SystemTable
     }
 
     /**
+     * Remove stored token being used by another node
+     */
+    public static synchronized void removeToken(Token token)
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, RING_KEY);
+        rm.delete(new QueryPath(STATUS_CF, null, 
p.getTokenFactory().toByteArray(token)), new 
TimestampClock(System.currentTimeMillis()));
+        try
+        {
+            rm.apply();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    /**
      * This method is used to update the System Table with the new token for 
this node
     */
     public static synchronized void updateToken(Token token)
@@ -139,6 +156,34 @@ public class SystemTable
     }
 
     /**
+     * Return a map of stored tokens to IP addresses
+     *
+     */
+    public static HashMap<Token, InetAddress> loadTokens()
+    {
+        HashMap<Token, InetAddress> tokenMap = new HashMap<Token, 
InetAddress>();
+        IPartitioner p = StorageService.getPartitioner();
+        Table table = Table.open(Table.SYSTEM_TABLE);
+        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(RING_KEY), 
new QueryPath(STATUS_CF));
+        ColumnFamily cf = 
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
+        if (cf != null)
+        {
+            for (IColumn column : cf.getSortedColumns())
+            {
+                try
+                {
+                    
tokenMap.put(p.getTokenFactory().fromByteArray(column.name()), 
InetAddress.getByAddress(column.value()));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+        }
+        return tokenMap;
+    }
+
+    /**
      * One of three things will happen if you try to read the system table:
      * 1. files are present and you can read them: great
      * 2. no files are there: great (new node is assumed)

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1003265&r1=1003264&r2=1003265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Sep 30 
21:03:24 2010
@@ -863,6 +863,23 @@ public class Gossiper implements IFailur
         gossipTimer_.schedule( new GossipTimerTask(), 
Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
     }
 
+    /**
+     * Add an endpoint we knew about previously, but whose state is unknown
+     */
+    public void addSavedEndpoint(InetAddress ep)
+    {
+        EndpointState epState = endpointStateMap_.get(ep);
+        if (epState == null)
+        {
+            epState = new EndpointState(new HeartBeatState(0));
+            epState.isAlive(false);
+            epState.isAGossiper(true);
+            epState.setHasToken(true);
+            endpointStateMap_.put(ep, epState);
+            unreachableEndpoints_.add(ep);
+        }
+    }
+
     public void addLocalApplicationState(ApplicationState state, 
VersionedValue value)
     {
         assert !StorageService.instance.isClientMode();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1003265&r1=1003264&r2=1003265&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Thu Sep 30 21:03:24 2010
@@ -351,6 +351,16 @@ public class StorageService implements I
             logger_.warn("Unable to start GCInspector (currently only 
supported on the Sun JVM)");
         }
 
+        if (Boolean.valueOf(System.getProperty("cassandra.load_ring_state", 
"true")))
+        {
+            logger_.info("Loading persisted ring state");
+            for (Map.Entry<Token, InetAddress> entry : 
SystemTable.loadTokens().entrySet())
+            {
+                tokenMetadata_.updateNormalToken(entry.getKey(), 
entry.getValue());
+                Gossiper.instance.addSavedEndpoint(entry.getValue());
+            }
+        }
+
         logger_.info("Starting up server gossip");
 
         // have to start the gossip service before we can see any info on 
other nodes.  this is necessary
@@ -654,18 +664,32 @@ public class StorageService implements I
 
         // we don't want to update if this node is responsible for the token 
and it has a later startup time than endpoint.
         InetAddress currentNode = tokenMetadata_.getEndpoint(token);
-        if (currentNode == null || 
(FBUtilities.getLocalAddress().equals(currentNode) && 
Gossiper.instance.compareEndpointStartup(endpoint, currentNode) > 0))
+        if (currentNode == null)
+        {
+            logger_.debug("New node " + endpoint + " at token " + token);
+            tokenMetadata_.updateNormalToken(token, endpoint);
+            if (!isClientMode)
+                SystemTable.updateToken(endpoint, token);
+        }
+        else if (Gossiper.instance.compareEndpointStartup(endpoint, 
currentNode) > 0)
+        {
+            logger_.info(String.format("Nodes %s and %s have the same token 
%s.  %s is the new owner",
+                                       endpoint, currentNode, token, 
endpoint));
             tokenMetadata_.updateNormalToken(token, endpoint);
+            if (!isClientMode)
+                SystemTable.updateToken(endpoint, token);
+        }
         else
-            logger_.info("Will not change my token ownership to " + endpoint);
+        {
+            logger_.info(String.format("Nodes %s and %s have the same token 
%s.  Ignoring %s",
+                                       endpoint, currentNode, token, 
endpoint));
+        }
 
         if(pieces.length > 2) {
             handleStateRemoving(endpoint, pieces);
         }
 
         calculatePendingRanges();
-        if (!isClientMode)
-            SystemTable.updateToken(endpoint, token);
     }
 
     /**
@@ -774,6 +798,11 @@ public class StorageService implements I
             // grab any data we are now responsible for and notify responsible 
node
             restoreReplicaCount(removeEndpoint, endpoint);
         }
+        if (!isClientMode)
+        {
+            logger_.info("Removing token " + removeToken + " for " + 
removeEndpoint);
+            SystemTable.removeToken(removeToken);
+        }
     }
 
     /**


Reply via email to