Author: brandonwilliams Date: Thu Sep 30 21:03:24 2010 New Revision: 1003265
URL: http://svn.apache.org/viewvc?rev=1003265&view=rev Log: Persist ring state between restarts. Patch by brandonwilliams and jbellis, reviewed by brandonwilliams for CASSANDRA-1518 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1003265&r1=1003264&r2=1003265&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Sep 30 21:03:24 2010 @@ -23,7 +23,8 @@ import java.io.FilenameFilter; import java.io.IOError; import java.io.IOException; import java.net.InetAddress; -import java.util.Collection; +import java.net.UnknownHostException; +import java.util.HashMap; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; @@ -32,11 +33,8 @@ import org.apache.commons.lang.ArrayUtil import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.clock.TimestampReconciler; -import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.BytesType; @@ -53,6 +51,7 @@ public class SystemTable public static final String STATUS_CF = "LocationInfo"; // keep the old CF string for backwards-compatibility public static final String INDEX_CF = "IndexInfo"; private static final byte[] LOCATION_KEY = "L".getBytes(UTF_8); + private static final byte[] RING_KEY = "Ring".getBytes(UTF_8); private static final byte[] BOOTSTRAP_KEY = "Bootstrap".getBytes(UTF_8); private static final byte[] COOKIE_KEY = "Cookies".getBytes(UTF_8); private static final byte[] BOOTSTRAP = "B".getBytes(UTF_8); @@ -92,8 +91,8 @@ public class SystemTable { IPartitioner p = StorageService.getPartitioner(); ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF); - cf.addColumn(new Column(ep.getAddress(), p.getTokenFactory().toByteArray(token), new TimestampClock(System.currentTimeMillis()))); - RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY); + cf.addColumn(new Column(p.getTokenFactory().toByteArray(token), ep.getAddress(), new TimestampClock(System.currentTimeMillis()))); + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, RING_KEY); rm.add(cf); try { @@ -106,6 +105,24 @@ public class SystemTable } /** + * Remove stored token being used by another node + */ + public static synchronized void removeToken(Token token) + { + IPartitioner p = StorageService.getPartitioner(); + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, RING_KEY); + rm.delete(new QueryPath(STATUS_CF, null, p.getTokenFactory().toByteArray(token)), new TimestampClock(System.currentTimeMillis())); + try + { + rm.apply(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + /** * This method is used to update the System Table with the new token for this node */ public static synchronized void updateToken(Token token) @@ -139,6 +156,34 @@ public class SystemTable } /** + * Return a map of stored tokens to IP addresses + * + */ + public static HashMap<Token, InetAddress> loadTokens() + { + HashMap<Token, InetAddress> tokenMap = new HashMap<Token, InetAddress>(); + IPartitioner p = StorageService.getPartitioner(); + Table table = Table.open(Table.SYSTEM_TABLE); + QueryFilter filter = QueryFilter.getIdentityFilter(decorate(RING_KEY), new QueryPath(STATUS_CF)); + ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter); + if (cf != null) + { + for (IColumn column : cf.getSortedColumns()) + { + try + { + tokenMap.put(p.getTokenFactory().fromByteArray(column.name()), InetAddress.getByAddress(column.value())); + } + catch (UnknownHostException e) + { + throw new IOError(e); + } + } + } + return tokenMap; + } + + /** * One of three things will happen if you try to read the system table: * 1. files are present and you can read them: great * 2. no files are there: great (new node is assumed) Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1003265&r1=1003264&r2=1003265&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Sep 30 21:03:24 2010 @@ -863,6 +863,23 @@ public class Gossiper implements IFailur gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_); } + /** + * Add an endpoint we knew about previously, but whose state is unknown + */ + public void addSavedEndpoint(InetAddress ep) + { + EndpointState epState = endpointStateMap_.get(ep); + if (epState == null) + { + epState = new EndpointState(new HeartBeatState(0)); + epState.isAlive(false); + epState.isAGossiper(true); + epState.setHasToken(true); + endpointStateMap_.put(ep, epState); + unreachableEndpoints_.add(ep); + } + } + public void addLocalApplicationState(ApplicationState state, VersionedValue value) { assert !StorageService.instance.isClientMode(); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1003265&r1=1003264&r2=1003265&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Sep 30 21:03:24 2010 @@ -351,6 +351,16 @@ public class StorageService implements I logger_.warn("Unable to start GCInspector (currently only supported on the Sun JVM)"); } + if (Boolean.valueOf(System.getProperty("cassandra.load_ring_state", "true"))) + { + logger_.info("Loading persisted ring state"); + for (Map.Entry<Token, InetAddress> entry : SystemTable.loadTokens().entrySet()) + { + tokenMetadata_.updateNormalToken(entry.getKey(), entry.getValue()); + Gossiper.instance.addSavedEndpoint(entry.getValue()); + } + } + logger_.info("Starting up server gossip"); // have to start the gossip service before we can see any info on other nodes. this is necessary @@ -654,18 +664,32 @@ public class StorageService implements I // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. InetAddress currentNode = tokenMetadata_.getEndpoint(token); - if (currentNode == null || (FBUtilities.getLocalAddress().equals(currentNode) && Gossiper.instance.compareEndpointStartup(endpoint, currentNode) > 0)) + if (currentNode == null) + { + logger_.debug("New node " + endpoint + " at token " + token); + tokenMetadata_.updateNormalToken(token, endpoint); + if (!isClientMode) + SystemTable.updateToken(endpoint, token); + } + else if (Gossiper.instance.compareEndpointStartup(endpoint, currentNode) > 0) + { + logger_.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner", + endpoint, currentNode, token, endpoint)); tokenMetadata_.updateNormalToken(token, endpoint); + if (!isClientMode) + SystemTable.updateToken(endpoint, token); + } else - logger_.info("Will not change my token ownership to " + endpoint); + { + logger_.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s", + endpoint, currentNode, token, endpoint)); + } if(pieces.length > 2) { handleStateRemoving(endpoint, pieces); } calculatePendingRanges(); - if (!isClientMode) - SystemTable.updateToken(endpoint, token); } /** @@ -774,6 +798,11 @@ public class StorageService implements I // grab any data we are now responsible for and notify responsible node restoreReplicaCount(removeEndpoint, endpoint); } + if (!isClientMode) + { + logger_.info("Removing token " + removeToken + " for " + removeEndpoint); + SystemTable.removeToken(removeToken); + } } /**