Merge branch 'cassandra-1.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ef78c68 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ef78c68 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ef78c68 Branch: refs/heads/trunk Commit: 2ef78c689fc344f46fe7c6f12f21e490ca083da6 Parents: 57f1ad3 0feb59a Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Jul 17 12:05:02 2013 -0700 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Jul 17 12:05:02 2013 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/SystemKeyspace.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ef78c68/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 09b5c25,09f4c7b..09472bc --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,89 -1,5 +1,90 @@@ +2.0.0-beta2 + * Allow nodetool with no args, and with help to run without a server (CASSANDRA-5734) + * Cleanup AbstractType/TypeSerializer classes (CASSANDRA-5744) + * Remove unimplemented cli option schema-mwt (CASSANDRA-5754) + * Support range tombstones in thrift (CASSANDRA-5435) + * Normalize table-manipulating CQL3 statements' class names (CASSANDRA-5759) + * cqlsh: add missing table options to DESCRIBE output (CASSANDRA-5749) + +2.0.0-beta1 + * Removed on-heap row cache (CASSANDRA-5348) + * use nanotime consistently for node-local timeouts (CASSANDRA-5581) + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577) + * Experimental triggers (CASSANDRA-1311) + * JEMalloc support for off-heap allocation (CASSANDRA-3997) + * Single-pass compaction (CASSANDRA-4180) + * Removed token range bisection (CASSANDRA-5518) + * Removed compatibility with pre-1.2.5 sstables and network messages + (CASSANDRA-5511) + * removed PBSPredictor (CASSANDRA-5455) + * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619, 5667) + * Leveled compaction performs size-tiered compactions in L0 + (CASSANDRA-5371, 5439) + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) + * Log when a node is down longer than the hint window (CASSANDRA-4554) + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917) + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407) + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430) + * Change Message IDs to ints (CASSANDRA-5307) + * Move sstable level information into the Stats component, removing the + need for a separate Manifest file (CASSANDRA-4872) + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) + * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650) + * add default_time_to_live (CASSANDRA-3974) + * add memtable_flush_period_in_ms (CASSANDRA-4237) + * replace supercolumns internally by composites (CASSANDRA-3237, 5123) + * upgrade thrift to 0.9.0 (CASSANDRA-3719) + * drop unnecessary keyspace parameter from user-defined compaction API + (CASSANDRA-5139) + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) + * Change order of directory searching for c*.in.sh (CASSANDRA-3983) + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271) + * Allow custom configuration loader (CASSANDRA-5045) + * Remove memory emergency pressure valve logic (CASSANDRA-3534) + * Reduce request latency with eager retry (CASSANDRA-4705) + * cqlsh: Remove ASSUME command (CASSANDRA-5331) + * Rebuild BF when loading sstables if bloom_filter_fp_chance + has changed since compaction (CASSANDRA-5015) + * remove row-level bloom filters (CASSANDRA-4885) + * Change Kernel Page Cache skipping into row preheating (disabled by default) + (CASSANDRA-4937) + * Improve repair by deciding on a gcBefore before sending + out TreeRequests (CASSANDRA-4932) + * Add an official way to disable compactions (CASSANDRA-5074) + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919) + * Add binary protocol versioning (CASSANDRA-5436) + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530) + * Add alias support to SELECT statement (CASSANDRA-5075) + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541) + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579) + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585) + * Track max/min column names in sstables to be able to optimize slice + queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600) + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693) + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450) + * Support native link w/o JNA in Java7 (CASSANDRA-3734) + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545) + * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582) + * cqlsh: Add row count to SELECT output (CASSANDRA-5636) + * Include a timestamp with all read commands to determine column expiration + (CASSANDRA-5149) + * Streaming 2.0 (CASSANDRA-5286, 5699) + * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737) + * more pre-table creation property validation (CASSANDRA-5693) + * Redesign repair messages (CASSANDRA-5426) + * Fix ALTER RENAME post-5125 (CASSANDRA-5702) + * Disallow renaming a 2ndary indexed column (CASSANDRA-5705) + * Rename Table to Keyspace (CASSANDRA-5613) + * Ensure changing column_index_size_in_kb on different nodes don't corrupt the + sstable (CASSANDRA-5454) + * Move resultset type information into prepare, not execute (CASSANDRA-5649) + * Auto paging in binary protocol (CASSANDRA-4415, 5714) + * Don't tie client side use of AbstractType to JDBC (CASSANDRA-4495) + * Adds new TimestampType to replace DateType (CASSANDRA-5723, CASSANDRA-5729) + + 1.2.7 + * add cassandra.unsafetruncate property (CASSANDRA-5704) * (Hadoop) quote identifiers in CqlPagingRecordReader (CASSANDRA-5763) * Add replace_node functionality for vnodes (CASSANDRA-5337) * Add timeout events to query traces (CASSANDRA-5520) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ef78c68/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index ba8f63a,0000000..135df26 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -1,832 -1,0 +1,833 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.base.Function; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.PaxosState; +import org.apache.cassandra.thrift.cassandraConstants; +import org.apache.cassandra.utils.*; + +import static org.apache.cassandra.cql3.QueryProcessor.processInternal; + +public class SystemKeyspace +{ + private static final Logger logger = LoggerFactory.getLogger(SystemKeyspace.class); + + // see CFMetaData for schema definitions + public static final String PEERS_CF = "peers"; + public static final String PEER_EVENTS_CF = "peer_events"; + public static final String LOCAL_CF = "local"; + public static final String INDEX_CF = "IndexInfo"; + public static final String COUNTER_ID_CF = "NodeIdInfo"; + public static final String HINTS_CF = "hints"; + public static final String RANGE_XFERS_CF = "range_xfers"; + public static final String BATCHLOG_CF = "batchlog"; + // see layout description in the DefsTables class header + public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces"; + public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies"; + public static final String SCHEMA_COLUMNS_CF = "schema_columns"; + public static final String SCHEMA_TRIGGERS_CF = "schema_triggers"; + public static final String COMPACTION_LOG = "compactions_in_progress"; + public static final String PAXOS_CF = "paxos"; + + private static final String LOCAL_KEY = "local"; + private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local"); + + public enum BootstrapState + { + NEEDS_BOOTSTRAP, + COMPLETED, + IN_PROGRESS + } + + private static DecoratedKey decorate(ByteBuffer key) + { + return StorageService.getPartitioner().decorateKey(key); + } + + public static void finishStartup() + { + setupVersion(); + + // add entries to system schema columnfamilies for the hardcoded system definitions + for (String ksname : Schema.systemKeyspaceNames) + { + KSMetaData ksmd = Schema.instance.getKSMetaData(ksname); + + // delete old, possibly obsolete entries in schema columnfamilies + for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF)) + { + String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name); + processInternal(req); + } + + // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added) + ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply(); + } + } + + private static void setupVersion() + { + String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s')"; + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + processInternal(String.format(req, LOCAL_CF, + LOCAL_KEY, + FBUtilities.getReleaseVersionString(), + QueryProcessor.CQL_VERSION.toString(), + cassandraConstants.VERSION, + snitch.getDatacenter(FBUtilities.getBroadcastAddress()), + snitch.getRack(FBUtilities.getBroadcastAddress()), + DatabaseDescriptor.getPartitioner().getClass().getName())); + } + + /** + * Write compaction log, except columfamilies under system keyspace. + * + * @param cfs + * @param toCompact sstables to compact + * @return compaction task id or null if cfs is under system keyspace + */ + public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact) + { + if (Keyspace.SYSTEM_KS.equals(cfs.keyspace.getName())) + return null; + + UUID compactionId = UUIDGen.getTimeUUID(); + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})"; + Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>() + { + public Integer apply(SSTableReader sstable) + { + return sstable.descriptor.generation; + } + }); + processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ','))); + forceBlockingFlush(COMPACTION_LOG); + return compactionId; + } + + public static void finishCompaction(UUID taskId) + { + assert taskId != null; + + String req = "DELETE FROM system.%s WHERE id = %s"; + processInternal(String.format(req, COMPACTION_LOG, taskId)); + forceBlockingFlush(COMPACTION_LOG); + } + + /** + * @return unfinished compactions, grouped by keyspace/columnfamily pair. + */ + public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions() + { + String req = "SELECT * FROM system.%s"; + UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG)); + + SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create(); + for (UntypedResultSet.Row row : resultSet) + { + String keyspace = row.getString("keyspace_name"); + String columnfamily = row.getString("columnfamily_name"); + Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); + + unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs); + } + return unfinishedCompactions; + } + + public static void discardCompactionsInProgress() + { + ColumnFamilyStore compactionLog = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG); + compactionLog.truncateBlocking(); + } + + public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) + { + String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; + processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY)); - forceBlockingFlush(LOCAL_CF); ++ if (!Boolean.getBoolean("cassandra.unsafetruncate")) ++ forceBlockingFlush(LOCAL_CF); + } + + /** + * This method is used to remove information about truncation time for specified column family + */ + public static void removeTruncationRecord(UUID cfId) + { + String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'"; + processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY)); + forceBlockingFlush(LOCAL_CF); + } + + private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) + { + DataOutputBuffer out = new DataOutputBuffer(); + try + { + ReplayPosition.serializer.serialize(position, out); + out.writeLong(truncatedAt); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return String.format("{%s: 0x%s}", + cfs.metadata.cfId, + ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength()))); + } + + public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords() + { + String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'"; + UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + if (rows.isEmpty()) + return Collections.emptyMap(); + + UntypedResultSet.Row row = rows.one(); + Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance); + if (rawMap == null) + return Collections.emptyMap(); + + Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>(); + for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet()) + positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); + return positions; + } + + private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes) + { + try + { + DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); + return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /** + * Record tokens being used by another node + */ + public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens) + { + if (ep.equals(FBUtilities.getBroadcastAddress())) + { + removeEndpoint(ep); + return; + } + + String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)"; + processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens))); + forceBlockingFlush(PEERS_CF); + } + + public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) + { + String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')"; + processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress())); + forceBlockingFlush(PEERS_CF); + } + + public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value) + { + if (ep.equals(FBUtilities.getBroadcastAddress())) + return; + + String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)"; + processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value)); + } + + public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) + { + // with 30 day TTL + String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'"; + processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress())); + } + + public static synchronized void updateSchemaVersion(UUID version) + { + String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString())); + } + + private static String tokensAsSet(Collection<Token> tokens) + { + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + StringBuilder sb = new StringBuilder(); + sb.append("{"); + Iterator<Token> iter = tokens.iterator(); + while (iter.hasNext()) + { + sb.append("'").append(factory.toString(iter.next())).append("'"); + if (iter.hasNext()) + sb.append(","); + } + sb.append("}"); + return sb.toString(); + } + + private static Collection<Token> deserializeTokens(Collection<String> tokensStrings) + { + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + List<Token> tokens = new ArrayList<Token>(tokensStrings.size()); + for (String tk : tokensStrings) + tokens.add(factory.fromString(tk)); + return tokens; + } + + /** + * Remove stored tokens being used by another node + */ + public static synchronized void removeEndpoint(InetAddress ep) + { + String req = "DELETE FROM system.%s WHERE peer = '%s'"; + processInternal(String.format(req, PEERS_CF, ep.getHostAddress())); + forceBlockingFlush(PEERS_CF); + } + + /** + * This method is used to update the System Keyspace with the new tokens for this node + */ + public static synchronized void updateTokens(Collection<Token> tokens) + { + assert !tokens.isEmpty() : "removeEndpoint should be used instead"; + String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens))); + forceBlockingFlush(LOCAL_CF); + } + + /** + * Convenience method to update the list of tokens in the local system keyspace. + * + * @param addTokens tokens to add + * @param rmTokens tokens to remove + * @return the collection of persisted tokens + */ + public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens) + { + Collection<Token> tokens = getSavedTokens(); + tokens.removeAll(rmTokens); + tokens.addAll(addTokens); + updateTokens(tokens); + return tokens; + } + + private static void forceBlockingFlush(String cfname) + { + Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfname).forceBlockingFlush(); + } + + /** + * Return a map of stored tokens to IP addresses + * + */ + public static SetMultimap<InetAddress, Token> loadTokens() + { + SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create(); + for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF)) + { + InetAddress peer = row.getInetAddress("peer"); + if (row.has("tokens")) + tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance))); + } + + return tokenMap; + } + + /** + * Return a map of store host_ids to IP addresses + * + */ + public static Map<InetAddress, UUID> loadHostIds() + { + Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>(); + for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF)) + { + InetAddress peer = row.getInetAddress("peer"); + if (row.has("host_id")) + { + hostIdMap.put(peer, row.getUUID("host_id")); + } + } + return hostIdMap; + } + + public static InetAddress getPreferredIP(InetAddress ep) + { + String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'"; + UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress())); + if (!result.isEmpty() && result.one().has("preferred_ip")) + return result.one().getInetAddress("preferred_ip"); + return null; + } + + /** + * Return a map of IP addresses containing a map of dc and rack info + */ + public static Map<InetAddress, Map<String,String>> loadDcRackInfo() + { + Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>(); + for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF)) + { + InetAddress peer = row.getInetAddress("peer"); + if (row.has("data_center") && row.has("rack")) + { + Map<String, String> dcRack = new HashMap<String, String>(); + dcRack.put("data_center", row.getString("data_center")); + dcRack.put("rack", row.getString("rack")); + result.put(peer, dcRack); + } + } + return result; + } + + /** + * One of three things will happen if you try to read the system keyspace: + * 1. files are present and you can read them: great + * 2. no files are there: great (new node is assumed) + * 3. files are present but you can't read them: bad + * @throws ConfigurationException + */ + public static void checkHealth() throws ConfigurationException + { + Keyspace keyspace; + try + { + keyspace = Keyspace.open(Keyspace.SYSTEM_KS); + } + catch (AssertionError err) + { + // this happens when a user switches from OPP to RP. + ConfigurationException ex = new ConfigurationException("Could not read system keyspace!"); + ex.initCause(err); + throw ex; + } + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF); + + String req = "SELECT cluster_name FROM system.%s WHERE key='%s'"; + UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + + if (result.isEmpty() || !result.one().has("cluster_name")) + { + // this is a brand new node + if (!cfs.getSSTables().isEmpty()) + throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!"); + + // no system files. this is a new node. + req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName())); + return; + } + + String savedClusterName = result.one().getString("cluster_name"); + if (!DatabaseDescriptor.getClusterName().equals(savedClusterName)) + throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName()); + } + + public static Collection<Token> getSavedTokens() + { + String req = "SELECT tokens FROM system.%s WHERE key='%s'"; + UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + return result.isEmpty() || !result.one().has("tokens") + ? Collections.<Token>emptyList() + : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance)); + } + + public static int incrementAndGetGeneration() + { + String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'"; + UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + + int generation; + if (result.isEmpty() || !result.one().has("gossip_generation")) + { + // seconds-since-epoch isn't a foolproof new generation + // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"), + // but it's as close as sanely possible + generation = (int) (System.currentTimeMillis() / 1000); + } + else + { + // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen. + final int storedGeneration = result.one().getInt("gossip_generation") + 1; + final int now = (int) (System.currentTimeMillis() / 1000); + if (storedGeneration >= now) + { + logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}. See CASSANDRA-3654 if you experience problems", + storedGeneration, now); + generation = storedGeneration; + } + else + { + generation = now; + } + } + + req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation)); + forceBlockingFlush(LOCAL_CF); + + return generation; + } + + public static BootstrapState getBootstrapState() + { + String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; + UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + + if (result.isEmpty() || !result.one().has("bootstrapped")) + return BootstrapState.NEEDS_BOOTSTRAP; + + return BootstrapState.valueOf(result.one().getString("bootstrapped")); + } + + public static boolean bootstrapComplete() + { + return getBootstrapState() == BootstrapState.COMPLETED; + } + + public static boolean bootstrapInProgress() + { + return getBootstrapState() == BootstrapState.IN_PROGRESS; + } + + public static void setBootstrapState(BootstrapState state) + { + String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name())); + forceBlockingFlush(LOCAL_CF); + } + + public static boolean isIndexBuilt(String keyspaceName, String indexName) + { + ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(INDEX_CF); + QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)), + INDEX_CF, + ByteBufferUtil.bytes(indexName), + System.currentTimeMillis()); + return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null; + } + + public static void setIndexBuilt(String keyspaceName, String indexName) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF); + cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros())); + RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf); + rm.apply(); + forceBlockingFlush(INDEX_CF); + } + + public static void setIndexRemoved(String keyspaceName, String indexName) + { + RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName)); + rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros()); + rm.apply(); + forceBlockingFlush(INDEX_CF); + } + + /** + * Read the host ID from the system keyspace, creating (and storing) one if + * none exists. + */ + public static UUID getLocalHostId() + { + UUID hostId = null; + + String req = "SELECT host_id FROM system.%s WHERE key='%s'"; + UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); + + // Look up the Host UUID (return it if found) + if (!result.isEmpty() && result.one().has("host_id")) + { + return result.one().getUUID("host_id"); + } + + // ID not found, generate a new one, persist, and then return it. + hostId = UUID.randomUUID(); + logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId); + + req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId)); + return hostId; + } + + /** + * Read the current local node id from the system keyspace or null if no + * such node id is recorded. + */ + public static CounterId getCurrentLocalCounterId() + { + Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS); + + // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one) + QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY), + COUNTER_ID_CF, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + true, + 1, + System.currentTimeMillis()); + ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter); + if (cf != null && cf.getColumnCount() != 0) + return CounterId.wrap(cf.iterator().next().name()); + else + return null; + } + + /** + * Write a new current local node id to the system keyspace. + * + * @param oldCounterId the previous local node id (that {@code newCounterId} + * replace) or null if no such node id exists (new node or removed system + * keyspace) + * @param newCounterId the new current local node id to record + * @param now microsecond time stamp. + */ + public static void writeCurrentLocalCounterId(CounterId oldCounterId, CounterId newCounterId, long now) + { + ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress()); + + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF); + cf.addColumn(new Column(newCounterId.bytes(), ip, now)); + RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf); + rm.apply(); + forceBlockingFlush(COUNTER_ID_CF); + } + + public static List<CounterId.CounterIdRecord> getOldLocalCounterIds() + { + List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>(); + + Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS); + QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis()); + ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter); + + CounterId previous = null; + for (Column c : cf) + { + if (previous != null) + l.add(new CounterId.CounterIdRecord(previous, c.timestamp())); + + // this will ignore the last column on purpose since it is the + // current local node id + previous = CounterId.wrap(c.name()); + } + return l; + } + + /** + * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns) + * @return CFS responsible to hold low-level serialized schema + */ + public static ColumnFamilyStore schemaCFS(String cfName) + { + return Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfName); + } + + public static List<Row> serializedSchema() + { + List<Row> schema = new ArrayList<Row>(3); + + schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF)); + schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF)); + schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF)); + + return schema; + } + + /** + * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns) + * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily) + */ + public static List<Row> serializedSchema(String schemaCfName) + { + Token minToken = StorageService.getPartitioner().getMinimumToken(); + + return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()), + null, + new IdentityQueryFilter(), + Integer.MAX_VALUE, + System.currentTimeMillis()); + } + + public static Collection<RowMutation> serializeSchema() + { + Map<DecoratedKey, RowMutation> mutationMap = new HashMap<DecoratedKey, RowMutation>(); + + serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF); + serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF); + serializeSchema(mutationMap, SCHEMA_COLUMNS_CF); + + return mutationMap.values(); + } + + private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName) + { + for (Row schemaRow : serializedSchema(schemaCfName)) + { + if (Schema.ignoredSchemaRow(schemaRow)) + continue; + + RowMutation mutation = mutationMap.get(schemaRow.key); + if (mutation == null) + { + mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key); + mutationMap.put(schemaRow.key, mutation); + } + + mutation.add(schemaRow.cf); + } + } + + public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName) + { + Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>(); + + for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName)) + schema.put(schemaEntity.key, schemaEntity.cf); + + return schema; + } + + public static ByteBuffer getSchemaKSKey(String ksName) + { + return AsciiType.instance.fromString(ksName); + } + + public static Row readSchemaRow(String ksName) + { + DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); + + ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_KEYSPACES_CF); + ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis())); + + return new Row(key, result); + } + + public static Row readSchemaRow(String ksName, String cfName) + { + DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); + + ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_COLUMNFAMILIES_CF); + ColumnFamily result = schemaCFS.getColumnFamily(key, + DefsTables.searchComposite(cfName, true), + DefsTables.searchComposite(cfName, false), + false, + Integer.MAX_VALUE, + System.currentTimeMillis()); + + return new Row(key, result); + } + + public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) + { + String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s"; + UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId)); + if (results.isEmpty()) + return new PaxosState(key, metadata); + UntypedResultSet.Row row = results.one(); + Commit inProgress = new Commit(key, + row.getUUID("in_progress_ballot"), + row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata)); + // either most_recent_commit and most_recent_commit_at will both be set, or neither + Commit mostRecent = row.has("most_recent_commit") + ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit"))) + : Commit.emptyCommit(key, metadata); + return new PaxosState(inProgress, mostRecent); + } + + public static void savePaxosPromise(Commit promise) + { + String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s"; + processInternal(String.format(req, + PAXOS_CF, + UUIDGen.microsTimestamp(promise.ballot), + paxosTtl(promise.update.metadata), + promise.ballot, + ByteBufferUtil.bytesToHex(promise.key), + promise.update.id())); + } + + public static void savePaxosProposal(Commit commit) + { + processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s", + PAXOS_CF, + UUIDGen.microsTimestamp(commit.ballot), + paxosTtl(commit.update.metadata), + ByteBufferUtil.bytesToHex(commit.update.toBytes()), + ByteBufferUtil.bytesToHex(commit.key), + commit.update.id())); + } + + private static int paxosTtl(CFMetaData metadata) + { + // keep paxos state around for at least 3h + return Math.max(3 * 3600, metadata.getGcGraceSeconds()); + } + + public static void savePaxosCommit(Commit commit, UUID inProgressBallot) + { + String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; + // identical except adds proposal = null + String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s"; + boolean proposalAfterCommit = inProgressBallot.timestamp() > commit.ballot.timestamp(); + processInternal(String.format(proposalAfterCommit ? preserveCql : eraseCql, + PAXOS_CF, + UUIDGen.microsTimestamp(commit.ballot), + paxosTtl(commit.update.metadata), + proposalAfterCommit ? inProgressBallot : commit.ballot, + commit.ballot, + ByteBufferUtil.bytesToHex(commit.update.toBytes()), + ByteBufferUtil.bytesToHex(commit.key), + commit.update.id())); + } +}