http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 5b63ba6..e826dd8 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -38,34 +38,34 @@ import com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.*; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; +import static java.lang.String.format; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; + import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; @@ -102,187 +102,203 @@ public final class SystemKeyspace public static final String BUILT_VIEWS = "built_views"; public static final String PREPARED_STATEMENTS = "prepared_statements"; - public static final CFMetaData Batches = - compile(BATCHES, - "batches awaiting replay", - "CREATE TABLE %s (" - + "id timeuuid," - + "mutations list<blob>," - + "version int," - + "PRIMARY KEY ((id)))") - .copy(new LocalPartitioner(TimeUUIDType.instance)) - .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) - .gcGraceSeconds(0); - - private static final CFMetaData Paxos = - compile(PAXOS, - "in-progress paxos proposals", - "CREATE TABLE %s (" - + "row_key blob," - + "cf_id UUID," - + "in_progress_ballot timeuuid," - + "most_recent_commit blob," - + "most_recent_commit_at timeuuid," - + "most_recent_commit_version int," - + "proposal blob," - + "proposal_ballot timeuuid," - + "proposal_version int," - + "PRIMARY KEY ((row_key), cf_id))") - .compaction(CompactionParams.lcs(emptyMap())); - - private static final CFMetaData BuiltIndexes = - compile(BUILT_INDEXES, - "built column indexes", - "CREATE TABLE \"%s\" (" - + "table_name text," // table_name here is the name of the keyspace - don't be fooled - + "index_name text," - + "PRIMARY KEY ((table_name), index_name)) " - + "WITH COMPACT STORAGE"); - - private static final CFMetaData Local = - compile(LOCAL, - "information about the local node", - "CREATE TABLE %s (" - + "key text," - + "bootstrapped text," - + "broadcast_address inet," - + "cluster_name text," - + "cql_version text," - + "data_center text," - + "gossip_generation int," - + "host_id uuid," - + "listen_address inet," - + "native_protocol_version text," - + "partitioner text," - + "rack text," - + "release_version text," - + "rpc_address inet," - + "schema_version uuid," - + "tokens set<varchar>," - + "truncated_at map<uuid, blob>," - + "PRIMARY KEY ((key)))" - ).recordDeprecatedSystemColumn("thrift_version", UTF8Type.instance); - - private static final CFMetaData Peers = - compile(PEERS, - "information about known peers in the cluster", - "CREATE TABLE %s (" - + "peer inet," - + "data_center text," - + "host_id uuid," - + "preferred_ip inet," - + "rack text," - + "release_version text," - + "rpc_address inet," - + "schema_version uuid," - + "tokens set<varchar>," - + "PRIMARY KEY ((peer)))"); - - private static final CFMetaData PeerEvents = - compile(PEER_EVENTS, - "events related to peers", - "CREATE TABLE %s (" - + "peer inet," - + "hints_dropped map<uuid, int>," - + "PRIMARY KEY ((peer)))"); - - private static final CFMetaData RangeXfers = - compile(RANGE_XFERS, - "ranges requested for transfer", - "CREATE TABLE %s (" - + "token_bytes blob," - + "requested_at timestamp," - + "PRIMARY KEY ((token_bytes)))"); - - private static final CFMetaData CompactionHistory = - compile(COMPACTION_HISTORY, - "week-long compaction history", - "CREATE TABLE %s (" - + "id uuid," - + "bytes_in bigint," - + "bytes_out bigint," - + "columnfamily_name text," - + "compacted_at timestamp," - + "keyspace_name text," - + "rows_merged map<int, bigint>," - + "PRIMARY KEY ((id)))") - .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)); - - private static final CFMetaData SSTableActivity = - compile(SSTABLE_ACTIVITY, - "historic sstable read rates", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "generation int," - + "rate_120m double," - + "rate_15m double," - + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))"); - - private static final CFMetaData SizeEstimates = - compile(SIZE_ESTIMATES, - "per-table primary range size estimates", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "range_start text," - + "range_end text," - + "mean_partition_size bigint," - + "partitions_count bigint," - + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") - .gcGraceSeconds(0); - - private static final CFMetaData AvailableRanges = - compile(AVAILABLE_RANGES, - "available keyspace/ranges during bootstrap/replace that are ready to be served", - "CREATE TABLE %s (" - + "keyspace_name text," - + "ranges set<blob>," - + "PRIMARY KEY ((keyspace_name)))"); - - private static final CFMetaData TransferredRanges = - compile(TRANSFERRED_RANGES, - "record of transferred ranges for streaming operation", - "CREATE TABLE %s (" - + "operation text," - + "peer inet," - + "keyspace_name text," - + "ranges set<blob>," - + "PRIMARY KEY ((operation, keyspace_name), peer))"); - - private static final CFMetaData ViewsBuildsInProgress = - compile(VIEWS_BUILDS_IN_PROGRESS, - "views builds current progress", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "last_token varchar," - + "generation_number int," - + "PRIMARY KEY ((keyspace_name), view_name))"); - - private static final CFMetaData BuiltViews = - compile(BUILT_VIEWS, - "built views", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "status_replicated boolean," - + "PRIMARY KEY ((keyspace_name), view_name))"); - - private static final CFMetaData PreparedStatements = - compile(PREPARED_STATEMENTS, - "prepared statements", - "CREATE TABLE %s (" - + "prepared_id blob," - + "logged_keyspace text," - + "query_string text," - + "PRIMARY KEY ((prepared_id)))"); - - - private static CFMetaData compile(String name, String description, String schema) - { - return CFMetaData.compile(String.format(schema, name), SchemaConstants.SYSTEM_KEYSPACE_NAME) - .comment(description); + public static final TableMetadata Batches = + parse(BATCHES, + "batches awaiting replay", + "CREATE TABLE %s (" + + "id timeuuid," + + "mutations list<blob>," + + "version int," + + "PRIMARY KEY ((id)))") + .partitioner(new LocalPartitioner(TimeUUIDType.instance)) + .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) + .build(); + + private static final TableMetadata Paxos = + parse(PAXOS, + "in-progress paxos proposals", + "CREATE TABLE %s (" + + "row_key blob," + + "cf_id UUID," + + "in_progress_ballot timeuuid," + + "most_recent_commit blob," + + "most_recent_commit_at timeuuid," + + "most_recent_commit_version int," + + "proposal blob," + + "proposal_ballot timeuuid," + + "proposal_version int," + + "PRIMARY KEY ((row_key), cf_id))") + .compaction(CompactionParams.lcs(emptyMap())) + .build(); + + private static final TableMetadata BuiltIndexes = + parse(BUILT_INDEXES, + "built column indexes", + "CREATE TABLE \"%s\" (" + + "table_name text," // table_name here is the name of the keyspace - don't be fooled + + "index_name text," + + "PRIMARY KEY ((table_name), index_name)) " + + "WITH COMPACT STORAGE") + .build(); + + private static final TableMetadata Local = + parse(LOCAL, + "information about the local node", + "CREATE TABLE %s (" + + "key text," + + "bootstrapped text," + + "broadcast_address inet," + + "cluster_name text," + + "cql_version text," + + "data_center text," + + "gossip_generation int," + + "host_id uuid," + + "listen_address inet," + + "native_protocol_version text," + + "partitioner text," + + "rack text," + + "release_version text," + + "rpc_address inet," + + "schema_version uuid," + + "tokens set<varchar>," + + "truncated_at map<uuid, blob>," + + "PRIMARY KEY ((key)))") + .recordDeprecatedSystemColumn("thrift_version", UTF8Type.instance) + .build(); + + private static final TableMetadata Peers = + parse(PEERS, + "information about known peers in the cluster", + "CREATE TABLE %s (" + + "peer inet," + + "data_center text," + + "host_id uuid," + + "preferred_ip inet," + + "rack text," + + "release_version text," + + "rpc_address inet," + + "schema_version uuid," + + "tokens set<varchar>," + + "PRIMARY KEY ((peer)))") + .build(); + + private static final TableMetadata PeerEvents = + parse(PEER_EVENTS, + "events related to peers", + "CREATE TABLE %s (" + + "peer inet," + + "hints_dropped map<uuid, int>," + + "PRIMARY KEY ((peer)))") + .build(); + + private static final TableMetadata RangeXfers = + parse(RANGE_XFERS, + "ranges requested for transfer", + "CREATE TABLE %s (" + + "token_bytes blob," + + "requested_at timestamp," + + "PRIMARY KEY ((token_bytes)))") + .build(); + + private static final TableMetadata CompactionHistory = + parse(COMPACTION_HISTORY, + "week-long compaction history", + "CREATE TABLE %s (" + + "id uuid," + + "bytes_in bigint," + + "bytes_out bigint," + + "columnfamily_name text," + + "compacted_at timestamp," + + "keyspace_name text," + + "rows_merged map<int, bigint>," + + "PRIMARY KEY ((id)))") + .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)) + .build(); + + private static final TableMetadata SSTableActivity = + parse(SSTABLE_ACTIVITY, + "historic sstable read rates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "generation int," + + "rate_120m double," + + "rate_15m double," + + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))") + .build(); + + private static final TableMetadata SizeEstimates = + parse(SIZE_ESTIMATES, + "per-table primary range size estimates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "range_start text," + + "range_end text," + + "mean_partition_size bigint," + + "partitions_count bigint," + + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") + .build(); + + private static final TableMetadata AvailableRanges = + parse(AVAILABLE_RANGES, + "available keyspace/ranges during bootstrap/replace that are ready to be served", + "CREATE TABLE %s (" + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((keyspace_name)))") + .build(); + + private static final TableMetadata TransferredRanges = + parse(TRANSFERRED_RANGES, + "record of transferred ranges for streaming operation", + "CREATE TABLE %s (" + + "operation text," + + "peer inet," + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((operation, keyspace_name), peer))") + .build(); + + private static final TableMetadata ViewsBuildsInProgress = + parse(VIEWS_BUILDS_IN_PROGRESS, + "views builds current progress", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "last_token varchar," + + "generation_number int," + + "PRIMARY KEY ((keyspace_name), view_name))") + .build(); + + private static final TableMetadata BuiltViews = + parse(BUILT_VIEWS, + "built views", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "status_replicated boolean," + + "PRIMARY KEY ((keyspace_name), view_name))") + .build(); + + private static final TableMetadata PreparedStatements = + parse(PREPARED_STATEMENTS, + "prepared statements", + "CREATE TABLE %s (" + + "prepared_id blob," + + "logged_keyspace text," + + "query_string text," + + "PRIMARY KEY ((prepared_id)))") + .build(); + + private static TableMetadata.Builder parse(String table, String description, String cql) + { + return CreateTableStatement.parse(format(cql, table), SchemaConstants.SYSTEM_KEYSPACE_NAME) + .id(TableId.forSystemTable(SchemaConstants.SYSTEM_KEYSPACE_NAME, table)) + .dcLocalReadRepairChance(0.0) + .gcGraceSeconds(0) + .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1)) + .comment(description); } public static KeyspaceMetadata metadata() @@ -321,7 +337,7 @@ public final class SystemKeyspace .build(); } - private static volatile Map<UUID, Pair<CommitLogPosition, Long>> truncationRecords; + private static volatile Map<TableId, Pair<CommitLogPosition, Long>> truncationRecords; public enum BootstrapState { @@ -352,7 +368,7 @@ public final class SystemKeyspace "listen_address" + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - executeOnceInternal(String.format(req, LOCAL), + executeOnceInternal(format(req, LOCAL), LOCAL, DatabaseDescriptor.getClusterName(), FBUtilities.getReleaseVersionString(), @@ -377,7 +393,7 @@ public final class SystemKeyspace if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY)) return; String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTION_HISTORY), + executeInternal(format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, @@ -389,21 +405,21 @@ public final class SystemKeyspace public static TabularData getCompactionHistory() throws OpenDataException { - UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); + UntypedResultSet queryResultSet = executeInternal(format("SELECT * from system.%s", COMPACTION_HISTORY)); return CompactionHistoryTabularData.from(queryResultSet); } public static boolean isViewBuilt(String keyspaceName, String viewName) { String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?"; - UntypedResultSet result = executeInternal(String.format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName); + UntypedResultSet result = executeInternal(format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName); return !result.isEmpty(); } public static boolean isViewStatusReplicated(String keyspaceName, String viewName) { String req = "SELECT status_replicated FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?"; - UntypedResultSet result = executeInternal(String.format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName); + UntypedResultSet result = executeInternal(format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName); if (result.isEmpty()) return false; @@ -417,7 +433,7 @@ public final class SystemKeyspace return; String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name, status_replicated) VALUES (?, ?, ?)"; - executeInternal(String.format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName, replicated); + executeInternal(format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName, replicated); forceBlockingFlush(BUILT_VIEWS); } @@ -434,7 +450,7 @@ public final class SystemKeyspace public static void beginViewBuild(String ksname, String viewName, int generationNumber) { - executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS), + executeInternal(format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, generationNumber); @@ -461,13 +477,13 @@ public final class SystemKeyspace { String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory(); - executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); + executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); } public static Pair<Integer, Token> getViewBuildStatus(String ksname, String viewName) { String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?"; - UntypedResultSet queryResultSet = executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName); + UntypedResultSet queryResultSet = executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName); if (queryResultSet == null || queryResultSet.isEmpty()) return null; @@ -489,7 +505,7 @@ public final class SystemKeyspace public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); + executeInternal(format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); truncationRecords = null; forceBlockingFlush(LOCAL); } @@ -497,14 +513,14 @@ public final class SystemKeyspace /** * This method is used to remove information about truncation time for specified column family */ - public static synchronized void removeTruncationRecord(UUID cfId) + public static synchronized void removeTruncationRecord(TableId id) { - Pair<CommitLogPosition, Long> truncationRecord = getTruncationRecord(cfId); + Pair<CommitLogPosition, Long> truncationRecord = getTruncationRecord(id); if (truncationRecord == null) return; String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), cfId); + executeInternal(format(req, LOCAL, LOCAL), id.asUUID()); truncationRecords = null; forceBlockingFlush(LOCAL); } @@ -515,7 +531,7 @@ public final class SystemKeyspace { CommitLogPosition.serializer.serialize(position, out); out.writeLong(truncatedAt); - return singletonMap(cfs.metadata.cfId, out.asNewBuffer()); + return singletonMap(cfs.metadata.id.asUUID(), out.asNewBuffer()); } catch (IOException e) { @@ -523,36 +539,36 @@ public final class SystemKeyspace } } - public static CommitLogPosition getTruncatedPosition(UUID cfId) + public static CommitLogPosition getTruncatedPosition(TableId id) { - Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId); + Pair<CommitLogPosition, Long> record = getTruncationRecord(id); return record == null ? null : record.left; } - public static long getTruncatedAt(UUID cfId) + public static long getTruncatedAt(TableId id) { - Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId); + Pair<CommitLogPosition, Long> record = getTruncationRecord(id); return record == null ? Long.MIN_VALUE : record.right; } - private static synchronized Pair<CommitLogPosition, Long> getTruncationRecord(UUID cfId) + private static synchronized Pair<CommitLogPosition, Long> getTruncationRecord(TableId id) { if (truncationRecords == null) truncationRecords = readTruncationRecords(); - return truncationRecords.get(cfId); + return truncationRecords.get(id); } - private static Map<UUID, Pair<CommitLogPosition, Long>> readTruncationRecords() + private static Map<TableId, Pair<CommitLogPosition, Long>> readTruncationRecords() { - UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); + UntypedResultSet rows = executeInternal(format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - Map<UUID, Pair<CommitLogPosition, Long>> records = new HashMap<>(); + Map<TableId, Pair<CommitLogPosition, Long>> records = new HashMap<>(); if (!rows.isEmpty() && rows.one().has("truncated_at")) { Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet()) - records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); + records.put(TableId.fromUUID(entry.getKey()), truncationRecordFromBlob(entry.getValue())); } return records; @@ -579,7 +595,7 @@ public final class SystemKeyspace return; String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)); + executeInternal(format(req, PEERS), ep, tokensAsSet(tokens)); } public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) @@ -588,7 +604,7 @@ public final class SystemKeyspace return; String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS), ep, preferred_ip); + executeInternal(format(req, PEERS), ep, preferred_ip); forceBlockingFlush(PEERS); } @@ -598,20 +614,20 @@ public final class SystemKeyspace return; String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS, columnName), ep, value); + executeInternal(format(req, PEERS, columnName), ep, value); } public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) { // with 30 day TTL String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?"; - executeInternal(String.format(req, PEER_EVENTS), timePeriod, value, ep); + executeInternal(format(req, PEER_EVENTS), timePeriod, value, ep); } public static synchronized void updateSchemaVersion(UUID version) { String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), version); + executeInternal(format(req, LOCAL, LOCAL), version); } private static Set<String> tokensAsSet(Collection<Token> tokens) @@ -640,7 +656,7 @@ public final class SystemKeyspace public static synchronized void removeEndpoint(InetAddress ep) { String req = "DELETE FROM system.%s WHERE peer = ?"; - executeInternal(String.format(req, PEERS), ep); + executeInternal(format(req, PEERS), ep); forceBlockingFlush(PEERS); } @@ -656,7 +672,7 @@ public final class SystemKeyspace return; String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens)); + executeInternal(format(req, LOCAL, LOCAL), tokensAsSet(tokens)); forceBlockingFlush(LOCAL); } @@ -710,7 +726,7 @@ public final class SystemKeyspace public static InetAddress getPreferredIP(InetAddress ep) { String req = "SELECT preferred_ip FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); + UntypedResultSet result = executeInternal(format(req, PEERS), ep); if (!result.isEmpty() && result.one().has("preferred_ip")) return result.one().getInetAddress("preferred_ip"); return ep; @@ -752,7 +768,7 @@ public final class SystemKeyspace return new CassandraVersion(FBUtilities.getReleaseVersionString()); } String req = "SELECT release_version FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); + UntypedResultSet result = executeInternal(format(req, PEERS), ep); if (result != null && result.one().has("release_version")) { return new CassandraVersion(result.one().getString("release_version")); @@ -791,7 +807,7 @@ public final class SystemKeyspace ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL); String req = "SELECT cluster_name FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); if (result.isEmpty() || !result.one().has("cluster_name")) { @@ -811,7 +827,7 @@ public final class SystemKeyspace public static Collection<Token> getSavedTokens() { String req = "SELECT tokens FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); return result.isEmpty() || !result.one().has("tokens") ? Collections.<Token>emptyList() : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); @@ -820,7 +836,7 @@ public final class SystemKeyspace public static int incrementAndGetGeneration() { String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); int generation; if (result.isEmpty() || !result.one().has("gossip_generation")) @@ -848,7 +864,7 @@ public final class SystemKeyspace } req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), generation); + executeInternal(format(req, LOCAL, LOCAL), generation); forceBlockingFlush(LOCAL); return generation; @@ -857,7 +873,7 @@ public final class SystemKeyspace public static BootstrapState getBootstrapState() { String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); if (result.isEmpty() || !result.one().has("bootstrapped")) return BootstrapState.NEEDS_BOOTSTRAP; @@ -886,14 +902,14 @@ public final class SystemKeyspace return; String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), state.name()); + executeInternal(format(req, LOCAL, LOCAL), state.name()); forceBlockingFlush(LOCAL); } public static boolean isIndexBuilt(String keyspaceName, String indexName) { String req = "SELECT index_name FROM %s.\"%s\" WHERE table_name=? AND index_name=?"; - UntypedResultSet result = executeInternal(String.format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, indexName); + UntypedResultSet result = executeInternal(format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, indexName); return !result.isEmpty(); } @@ -915,7 +931,7 @@ public final class SystemKeyspace { List<String> names = new ArrayList<>(indexNames); String req = "SELECT index_name from %s.\"%s\" WHERE table_name=? AND index_name IN ?"; - UntypedResultSet results = executeInternal(String.format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, names); + UntypedResultSet results = executeInternal(format(req, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_INDEXES), keyspaceName, names); return StreamSupport.stream(results.spliterator(), false) .map(r -> r.getString("index_name")) .collect(Collectors.toList()); @@ -928,7 +944,7 @@ public final class SystemKeyspace public static UUID getLocalHostId() { String req = "SELECT host_id FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); // Look up the Host UUID (return it if found) if (!result.isEmpty() && result.one().has("host_id")) @@ -946,7 +962,7 @@ public final class SystemKeyspace public static UUID setLocalHostId(UUID hostId) { String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL, LOCAL), hostId); + executeInternal(format(req, LOCAL, LOCAL), hostId); return hostId; } @@ -956,7 +972,7 @@ public final class SystemKeyspace public static String getRack() { String req = "SELECT rack FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); // Look up the Rack (return it if found) if (!result.isEmpty() && result.one().has("rack")) @@ -971,7 +987,7 @@ public final class SystemKeyspace public static String getDatacenter() { String req = "SELECT data_center FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); + UntypedResultSet result = executeInternal(format(req, LOCAL, LOCAL)); // Look up the Data center (return it if found) if (!result.isEmpty() && result.one().has("data_center")) @@ -980,16 +996,16 @@ public final class SystemKeyspace return null; } - public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec) + public static PaxosState loadPaxosState(DecoratedKey key, TableMetadata metadata, int nowInSec) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), String.format(req, PAXOS), key.getKey(), metadata.cfId); + UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), format(req, PAXOS), key.getKey(), metadata.id.asUUID()); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); Commit promised = row.has("in_progress_ballot") - ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.partitionColumns(), 1)) + ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.regularAndStaticColumns(), 1)) : Commit.emptyCommit(key, metadata); // either we have both a recently accepted ballot and update or we have neither Commit accepted = row.has("proposal_version") && row.has("proposal") @@ -1007,27 +1023,27 @@ public final class SystemKeyspace public static void savePaxosPromise(Commit promise) { String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(req, PAXOS), + executeInternal(format(req, PAXOS), UUIDGen.microsTimestamp(promise.ballot), paxosTtlSec(promise.update.metadata()), promise.ballot, promise.update.partitionKey().getKey(), - promise.update.metadata().cfId); + promise.update.metadata().id.asUUID()); } public static void savePaxosProposal(Commit proposal) { - executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS), + executeInternal(format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS), UUIDGen.microsTimestamp(proposal.ballot), paxosTtlSec(proposal.update.metadata()), proposal.ballot, PartitionUpdate.toBytes(proposal.update, MessagingService.current_version), MessagingService.current_version, proposal.update.partitionKey().getKey(), - proposal.update.metadata().cfId); + proposal.update.metadata().id.asUUID()); } - public static int paxosTtlSec(CFMetaData metadata) + public static int paxosTtlSec(TableMetadata metadata) { // keep paxos state around for at least 3h return Math.max(3 * 3600, metadata.params.gcGraceSeconds); @@ -1038,14 +1054,14 @@ public final class SystemKeyspace // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(cql, PAXOS), + executeInternal(format(cql, PAXOS), UUIDGen.microsTimestamp(commit.ballot), paxosTtlSec(commit.update.metadata()), commit.ballot, PartitionUpdate.toBytes(commit.update, MessagingService.current_version), MessagingService.current_version, commit.update.partitionKey().getKey(), - commit.update.metadata().cfId); + commit.update.metadata().id.asUUID()); } /** @@ -1058,7 +1074,7 @@ public final class SystemKeyspace public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation) { String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?"; - UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); + UntypedResultSet results = executeInternal(format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); if (results.isEmpty()) return new RestorableMeter(); @@ -1076,7 +1092,7 @@ public final class SystemKeyspace { // Store values with a one-day TTL to handle corner cases where cleanup might not occur String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY), + executeInternal(format(cql, SSTABLE_ACTIVITY), keyspace, table, generation, @@ -1090,7 +1106,7 @@ public final class SystemKeyspace public static void clearSSTableReadMeter(String keyspace, String table, int generation) { String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); + executeInternal(format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); } /** @@ -1099,7 +1115,7 @@ public final class SystemKeyspace public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) { long timestamp = FBUtilities.timestampMicros(); - PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.partitionColumns(), estimates.size()); + PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size()); Mutation mutation = new Mutation(update); // delete all previous values with a single range tombstone. @@ -1126,7 +1142,7 @@ public final class SystemKeyspace */ public static void clearSizeEstimates(String keyspace, String table) { - String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES); + String cql = format("DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?", SizeEstimates.toString()); executeInternal(cql, keyspace, table); } @@ -1138,14 +1154,14 @@ public final class SystemKeyspace { rangesToUpdate.add(rangeToBytes(range)); } - executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace); + executeInternal(format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace); } public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner) { Set<Range<Token>> result = new HashSet<>(); String query = "SELECT * FROM system.%s WHERE keyspace_name=?"; - UntypedResultSet rs = executeInternal(String.format(query, AVAILABLE_RANGES), keyspace); + UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES), keyspace); for (UntypedResultSet.Row row : rs) { Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance); @@ -1174,14 +1190,14 @@ public final class SystemKeyspace { rangesToUpdate.add(rangeToBytes(range)); } - executeInternal(String.format(cql, TRANSFERRED_RANGES), rangesToUpdate, description, peer, keyspace); + executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, description, peer, keyspace); } public static synchronized Map<InetAddress, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner) { Map<InetAddress, Set<Range<Token>>> result = new HashMap<>(); String query = "SELECT * FROM system.%s WHERE operation = ? AND keyspace_name = ?"; - UntypedResultSet rs = executeInternal(String.format(query, TRANSFERRED_RANGES), description, keyspace); + UntypedResultSet rs = executeInternal(format(query, TRANSFERRED_RANGES), description, keyspace); for (UntypedResultSet.Row row : rs) { InetAddress peer = row.getInetAddress("peer"); @@ -1213,9 +1229,9 @@ public final class SystemKeyspace { logger.info("Detected version upgrade from {} to {}, snapshotting system keyspace", previous, next); - String snapshotName = Keyspace.getTimestampedSnapshotName(String.format("upgrade-%s-%s", - previous, - next)); + String snapshotName = Keyspace.getTimestampedSnapshotName(format("upgrade-%s-%s", + previous, + next)); Keyspace systemKs = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); systemKs.snapshot(snapshotName, null); return true; @@ -1238,7 +1254,7 @@ public final class SystemKeyspace private static String getPreviousVersionString() { String req = "SELECT release_version FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, SystemKeyspace.LOCAL, SystemKeyspace.LOCAL)); + UntypedResultSet result = executeInternal(format(req, SystemKeyspace.LOCAL, SystemKeyspace.LOCAL)); if (result.isEmpty() || !result.one().has("release_version")) { // it isn't inconceivable that one might try to upgrade a node straight from <= 1.1 to whatever @@ -1298,24 +1314,21 @@ public final class SystemKeyspace public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql) { - executeInternal(String.format("INSERT INTO %s.%s" - + " (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)", - SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS), + executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)", + PreparedStatements.toString()), loggedKeyspace, key.byteBuffer(), cql); logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql); } public static void removePreparedStatement(MD5Digest key) { - executeInternal(String.format("DELETE FROM %s.%s" - + " WHERE prepared_id = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS), + executeInternal(format("DELETE FROM %s WHERE prepared_id = ?", PreparedStatements.toString()), key.byteBuffer()); } public static List<Pair<String, String>> loadPreparedStatements() { - String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS); + String query = format("SELECT logged_keyspace, query_string FROM %s", PreparedStatements.toString()); UntypedResultSet resultSet = executeOnceInternal(query); List<Pair<String, String>> r = new ArrayList<>(); for (UntypedResultSet.Row row : resultSet)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index c32a642..d361bac 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -18,19 +18,13 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.io.IOError; -import java.util.*; - -import com.google.common.collect.Iterables; -import com.google.common.collect.PeekingIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; + +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.net.MessagingService; /** * Helper class to deserialize Unfiltered object from disk efficiently. @@ -43,7 +37,7 @@ public class UnfilteredDeserializer { private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class); - protected final CFMetaData metadata; + protected final TableMetadata metadata; protected final DataInputPlus in; protected final SerializationHelper helper; @@ -57,7 +51,7 @@ public class UnfilteredDeserializer private final Row.Builder builder; - private UnfilteredDeserializer(CFMetaData metadata, + private UnfilteredDeserializer(TableMetadata metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper) @@ -70,7 +64,7 @@ public class UnfilteredDeserializer this.builder = BTreeRow.sortedBuilder(); } - public static UnfilteredDeserializer create(CFMetaData metadata, + public static UnfilteredDeserializer create(TableMetadata metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/UnknownColumnException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java b/src/java/org/apache/cassandra/db/UnknownColumnException.java deleted file mode 100644 index 55dc453..0000000 --- a/src/java/org/apache/cassandra/db/UnknownColumnException.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** - * Exception thrown when we read a column internally that is unknown. Note that - * this is an internal exception and is not meant to be user facing. - */ -public class UnknownColumnException extends Exception -{ - public final ByteBuffer columnName; - - public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName) - { - super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName)); - this.columnName = columnName; - } - - private static String stringify(ByteBuffer name) - { - try - { - return UTF8Type.instance.getString(name); - } - catch (Exception e) - { - return ByteBufferUtil.bytesToHex(name); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java b/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java deleted file mode 100644 index c43b50a..0000000 --- a/src/java/org/apache/cassandra/db/UnknownColumnFamilyException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.IOException; -import java.util.UUID; - - -public class UnknownColumnFamilyException extends IOException -{ - public final UUID cfId; - - public UnknownColumnFamilyException(String msg, UUID cfId) - { - super(msg); - this.cfId = cfId; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index 71f8c43..3936ce4 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -22,7 +22,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.NoSuchElementException; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; @@ -37,6 +37,9 @@ import org.apache.cassandra.utils.ByteBufferUtil; public abstract class AbstractSSTableIterator implements UnfilteredRowIterator { protected final SSTableReader sstable; + // We could use sstable.metadata(), but that can change during execution so it's good hygiene to grab an immutable instance + protected final TableMetadata metadata; + protected final DecoratedKey key; protected final DeletionTime partitionLevelDeletion; protected final ColumnFilter columns; @@ -62,11 +65,12 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator FileHandle ifile) { this.sstable = sstable; + this.metadata = sstable.metadata(); this.ifile = ifile; this.key = key; this.columns = columnFilter; this.slices = slices; - this.helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter); + this.helper = new SerializationHelper(metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter); if (indexEntry == null) { @@ -178,12 +182,12 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator : createReaderInternal(indexEntry, file, shouldCloseFile); }; - public CFMetaData metadata() + public TableMetadata metadata() { - return sstable.metadata; + return metadata; } - public PartitionColumns columns() + public RegularAndStaticColumns columns() { return columns.fetchedColumns(); } @@ -306,7 +310,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator private void createDeserializer() { assert file != null && deserializer == null; - deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper); + deserializer = UnfilteredDeserializer.create(metadata, file, sstable.header, helper); } protected void seekToPosition(long position) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index 5d9ca37..3e6cc27 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -199,7 +199,7 @@ public class SSTableIterator extends AbstractSSTableIterator private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); - this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false, ifile); + this.indexState = new IndexState(this, metadata.comparator, indexEntry, false, ifile); this.lastBlockIdx = indexState.blocksCount(); // if we never call setForSlice, that's where we want to stop } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index 4de234c..6443a01 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db.columniterator; import java.io.IOException; import java.util.*; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.ImmutableBTreePartition; @@ -28,6 +27,7 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.btree.BTree; /** @@ -88,7 +88,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator protected ReusablePartitionData createBuffer(int blocksCount) { int estimatedRowCount = 16; - int columnCount = metadata().partitionColumns().regulars.size(); + int columnCount = metadata().regularColumns().size(); if (columnCount == 0 || metadata().clusteringColumns().isEmpty()) { estimatedRowCount = 1; @@ -222,7 +222,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); - this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true, ifile); + this.indexState = new IndexState(this, metadata.comparator, indexEntry, true, ifile); } @Override @@ -321,18 +321,18 @@ public class SSTableReversedIterator extends AbstractSSTableIterator private class ReusablePartitionData { - private final CFMetaData metadata; + private final TableMetadata metadata; private final DecoratedKey partitionKey; - private final PartitionColumns columns; + private final RegularAndStaticColumns columns; private MutableDeletionInfo.Builder deletionBuilder; private MutableDeletionInfo deletionInfo; private BTree.Builder<Row> rowBuilder; private ImmutableBTreePartition built; - private ReusablePartitionData(CFMetaData metadata, + private ReusablePartitionData(TableMetadata metadata, DecoratedKey partitionKey, - PartitionColumns columns, + RegularAndStaticColumns columns, int initialRowCapacity) { this.metadata = metadata; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 0ab941b..7e94911 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -32,8 +32,10 @@ import org.slf4j.LoggerFactory; import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -272,7 +274,7 @@ public abstract class AbstractCommitLogSegmentManager * * Flushes any dirty CFs for this segment and any older segments, and then discards the segments */ - void forceRecycleAll(Iterable<UUID> droppedCfs) + void forceRecycleAll(Iterable<TableId> droppedTables) { List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); @@ -292,8 +294,8 @@ public abstract class AbstractCommitLogSegmentManager future.get(); for (CommitLogSegment segment : activeSegments) - for (UUID cfId : droppedCfs) - segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + for (TableId tableId : droppedTables) + segment.markClean(tableId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() // if the previous active segment was the only one to recycle (since an active segment isn't @@ -367,27 +369,26 @@ public abstract class AbstractCommitLogSegmentManager final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); // a map of CfId -> forceFlush() to ensure we only queue one flush per cf - final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); + final Map<TableId, ListenableFuture<?>> flushes = new LinkedHashMap<>(); for (CommitLogSegment segment : segments) { - for (UUID dirtyCFId : segment.getDirtyCFIDs()) + for (TableId dirtyTableId : segment.getDirtyTableIds()) { - Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); - if (pair == null) + TableMetadata metadata = Schema.instance.getTableMetadata(dirtyTableId); + if (metadata == null) { // even though we remove the schema entry before a final flush when dropping a CF, // it's still possible for a writer to race and finish his append after the flush. - logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); - segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyTableId); + segment.markClean(dirtyTableId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); } - else if (!flushes.containsKey(dirtyCFId)) + else if (!flushes.containsKey(dirtyTableId)) { - String keyspace = pair.left; - final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + final ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(dirtyTableId); // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, // no deadlock possibility since switchLock removal - flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); + flushes.put(dirtyTableId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 750fabc..e93a131 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.security.EncryptionContext; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -205,9 +206,9 @@ public class CommitLog implements CommitLogMBean /** * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining */ - public void forceRecycleAllSegments(Iterable<UUID> droppedCfs) + public void forceRecycleAllSegments(Iterable<TableId> droppedTables) { - segmentManager.forceRecycleAll(droppedCfs); + segmentManager.forceRecycleAll(droppedTables); } /** @@ -215,7 +216,7 @@ public class CommitLog implements CommitLogMBean */ public void forceRecycleAllSegments() { - segmentManager.forceRecycleAll(Collections.<UUID>emptyList()); + segmentManager.forceRecycleAll(Collections.emptyList()); } /** @@ -295,13 +296,13 @@ public class CommitLog implements CommitLogMBean * Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position * given. Discards any commit log segments that are no longer used. * - * @param cfId the column family ID that was flushed + * @param id the table that was flushed * @param lowerBound the lowest covered replay position of the flush * @param lowerBound the highest covered replay position of the flush */ - public void discardCompletedSegments(final UUID cfId, final CommitLogPosition lowerBound, final CommitLogPosition upperBound) + public void discardCompletedSegments(final TableId id, final CommitLogPosition lowerBound, final CommitLogPosition upperBound) { - logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId); + logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, id); // Go thru the active segment files, which are ordered oldest to newest, marking the // flushed CF as clean, until we reach the segment file containing the CommitLogPosition passed @@ -310,7 +311,7 @@ public class CommitLog implements CommitLogMBean for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();) { CommitLogSegment segment = iter.next(); - segment.markClean(cfId, lowerBound, upperBound); + segment.markClean(id, lowerBound, upperBound); if (segment.isUnused()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index d25609a..1da0cee 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -29,15 +29,16 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason; import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.exceptions.UnknownTableException; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.RebufferingInputStream; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.JVMStabilityInspector; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@ -51,7 +52,7 @@ public class CommitLogReader @VisibleForTesting public static final int ALL_MUTATIONS = -1; private final CRC32 checksum; - private final Map<UUID, AtomicInteger> invalidMutations; + private final Map<TableId, AtomicInteger> invalidMutations; private byte[] buffer; @@ -62,7 +63,7 @@ public class CommitLogReader buffer = new byte[4096]; } - public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations() + public Set<Map.Entry<TableId, AtomicInteger>> getInvalidMutations() { return invalidMutations.entrySet(); } @@ -367,15 +368,15 @@ public class CommitLogReader for (PartitionUpdate upd : mutation.getPartitionUpdates()) upd.validate(); } - catch (UnknownColumnFamilyException ex) + catch (UnknownTableException ex) { - if (ex.cfId == null) + if (ex.id == null) return; - AtomicInteger i = invalidMutations.get(ex.cfId); + AtomicInteger i = invalidMutations.get(ex.id); if (i == null) { i = new AtomicInteger(1); - invalidMutations.put(ex.cfId, i); + invalidMutations.put(ex.id, i); } else i.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 4d2971f..961107c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -21,15 +21,12 @@ package org.apache.cassandra.db.commitlog; import java.io.File; import java.io.IOException; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.*; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; @@ -37,10 +34,11 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; @@ -64,7 +62,7 @@ public class CommitLogReplayer implements CommitLogReadHandler private final Queue<Future<Integer>> futures; private final AtomicInteger replayedCount; - private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted; + private final Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted; private final CommitLogPosition globalPosition; // Used to throttle speed of replay of mutations if we pass the max outstanding count @@ -78,11 +76,11 @@ public class CommitLogReplayer implements CommitLogReadHandler CommitLogReplayer(CommitLog commitLog, CommitLogPosition globalPosition, - Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted, + Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted, ReplayFilter replayFilter) { - this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>(); - this.futures = new ArrayDeque<Future<Integer>>(); + this.keyspacesReplayed = new NonBlockingHashSet<>(); + this.futures = new ArrayDeque<>(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); this.cfPersisted = cfPersisted; @@ -95,35 +93,35 @@ public class CommitLogReplayer implements CommitLogReadHandler public static CommitLogReplayer construct(CommitLog commitLog) { // compute per-CF and global replay intervals - Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<>(); + Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<>(); ReplayFilter replayFilter = ReplayFilter.create(); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // but, if we've truncated the cf in question, then we need to need to start replay after the truncation - CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId); + CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.id); if (truncatedAt != null) { // Point in time restore is taken to mean that the tables need to be replayed even if they were // deleted at a later point in time. Any truncation record after that point must thus be cleared prior // to replay (CASSANDRA-9195). long restoreTime = commitLog.archiver.restorePointInTime; - long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId); + long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.id); if (truncatedTime > restoreTime) { if (replayFilter.includes(cfs.metadata)) { logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.", - cfs.metadata.ksName, - cfs.metadata.cfName); - SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId); + cfs.metadata.keyspace, + cfs.metadata.name); + SystemKeyspace.removeTruncationRecord(cfs.metadata.id); truncatedAt = null; } } } IntervalSet<CommitLogPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt); - cfPersisted.put(cfs.metadata.cfId, filter); + cfPersisted.put(cfs.metadata.id, filter); } CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values()); logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted)); @@ -146,7 +144,7 @@ public class CommitLogReplayer implements CommitLogReadHandler */ public int blockForWrites() { - for (Map.Entry<UUID, AtomicInteger> entry : commitLogReader.getInvalidMutations()) + for (Map.Entry<TableId, AtomicInteger> entry : commitLogReader.getInvalidMutations()) logger.warn("Skipped {} mutations from unknown (probably removed) CF with id {}", entry.getValue(), entry.getKey()); // wait for all the writes to finish on the mutation stage @@ -192,7 +190,7 @@ public class CommitLogReplayer implements CommitLogReadHandler { public void runMayThrow() { - if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + if (Schema.instance.getKeyspaceMetadata(mutation.getKeyspaceName()) == null) return; if (commitLogReplayer.pointInTimeExceeded(mutation)) return; @@ -207,12 +205,12 @@ public class CommitLogReplayer implements CommitLogReadHandler Mutation newMutation = null; for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation)) { - if (Schema.instance.getCF(update.metadata().cfId) == null) + if (Schema.instance.getTableMetadata(update.metadata().id) == null) continue; // dropped // replay if current segment is newer than last flushed one or, // if it is the last known segment, if we are after the commit log segment position - if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation))) + if (commitLogReplayer.shouldReplay(update.metadata().id, new CommitLogPosition(segmentId, entryLocation))) { if (newMutation == null) newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); @@ -272,7 +270,7 @@ public class CommitLogReplayer implements CommitLogReadHandler { public abstract Iterable<PartitionUpdate> filter(Mutation mutation); - public abstract boolean includes(CFMetaData metadata); + public abstract boolean includes(TableMetadataRef metadata); public static ReplayFilter create() { @@ -307,7 +305,7 @@ public class CommitLogReplayer implements CommitLogReadHandler return mutation.getPartitionUpdates(); } - public boolean includes(CFMetaData metadata) + public boolean includes(TableMetadataRef metadata) { return true; } @@ -332,14 +330,14 @@ public class CommitLogReplayer implements CommitLogReadHandler { public boolean apply(PartitionUpdate upd) { - return cfNames.contains(upd.metadata().cfName); + return cfNames.contains(upd.metadata().name); } }); } - public boolean includes(CFMetaData metadata) + public boolean includes(TableMetadataRef metadata) { - return toReplay.containsEntry(metadata.ksName, metadata.cfName); + return toReplay.containsEntry(metadata.keyspace, metadata.name); } } @@ -349,9 +347,9 @@ public class CommitLogReplayer implements CommitLogReadHandler * * @return true iff replay is necessary */ - private boolean shouldReplay(UUID cfId, CommitLogPosition position) + private boolean shouldReplay(TableId tableId, CommitLogPosition position) { - return !cfPersisted.get(cfId).contains(position); + return !cfPersisted.get(tableId).contains(position); } protected boolean pointInTimeExceeded(Mutation fm)