Merge commit '017ec3e99e704db5e1a36ad153af08d6e7eca523' into cassandra-2.2 * commit '017ec3e99e704db5e1a36ad153af08d6e7eca523': Avoid stalling Paxos when the paxos state expires
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6555a87b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6555a87b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6555a87b Branch: refs/heads/trunk Commit: 6555a87bde4daeb8bd5d9558595a367ec6bc061d Parents: 3b448b3 017ec3e Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Jun 28 15:17:40 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jun 28 15:18:27 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/QueryProcessor.java | 28 +++++++++++++++++++- .../cql3/statements/SelectStatement.java | 6 ++++- .../org/apache/cassandra/db/SystemKeyspace.java | 6 ++--- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/service/paxos/PaxosState.java | 11 ++++++-- .../service/paxos/PrepareCallback.java | 18 ++++++++++++- 7 files changed, 63 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 8d2062d,feeaded..9f42d98 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,35 -1,5 +1,36 @@@ -2.1.15 +2.2.7 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) + * Validate bloom_filter_fp_chance against lowest supported + value when the table is created (CASSANDRA-11920) + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Persist local metadata earlier in startup sequence (CASSANDRA-11742) + * Run CommitLog tests with different compression settings (CASSANDRA-9039) + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: + * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043) * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Prevent select statements with clustering key > 64k (CASSANDRA-11882) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java index fa82fa7,4340d42..c702679 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@@ -331,15 -354,52 +331,41 @@@ public class QueryProcessor implements */ public static UntypedResultSet executeOnceInternal(String query, Object... values) { - try - { - ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState()); - prepared.statement.validate(internalQueryState().getClientState()); - ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values)); - if (result instanceof ResultMessage.Rows) - return UntypedResultSet.create(((ResultMessage.Rows)result).result); - else - return null; - } - catch (RequestExecutionException e) - { - throw new RuntimeException(e); - } - catch (RequestValidationException e) - { - throw new RuntimeException("Error validating query " + query, e); - } + ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState()); + prepared.statement.validate(internalQueryState().getClientState()); + ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values)); + if (result instanceof ResultMessage.Rows) + return UntypedResultSet.create(((ResultMessage.Rows)result).result); + else + return null; } + /** + * A special version of executeInternal that takes the time used as "now" for the query in argument. + * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare + * cases. + */ + public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values) + { + try + { + ParsedStatement.Prepared prepared = prepareInternal(query); + assert prepared.statement instanceof SelectStatement; + SelectStatement select = (SelectStatement)prepared.statement; + ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now); + assert result instanceof ResultMessage.Rows; + return UntypedResultSet.create(((ResultMessage.Rows)result).result); + } + catch (RequestExecutionException e) + { + throw new RuntimeException(e); + } + catch (RequestValidationException e) + { + throw new RuntimeException("Error validating query " + query, e); + } + } + public static UntypedResultSet resultify(String query, Row row) { return resultify(query, Collections.singletonList(row)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 20fe982,6351bb5..8820ff7 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -291,10 -312,17 +291,14 @@@ public class SelectStatement implement public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { + return executeInternal(state, options, System.currentTimeMillis()); + } + + public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException + { int limit = getLimit(options); - long now = System.currentTimeMillis(); Pageable command = getPageableCommand(options, limit, now); - - int pageSize = options.getPageSize(); - if (parameters.isCount && pageSize <= 0) - pageSize = DEFAULT_COUNT_PAGE_SIZE; + int pageSize = getPageSize(options); if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 74a3c7b,f8cf1ab..e0d5f66 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -880,24 -796,110 +880,24 @@@ public final class SystemKeyspac } /** - * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns) - * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily) + * Gets the stored data center for the local node, or null if none have been set yet. */ - public static List<Row> serializedSchema(String schemaCfName) - { - Token minToken = StorageService.getPartitioner().getMinimumToken(); - - return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()), - null, - new IdentityQueryFilter(), - Integer.MAX_VALUE, - System.currentTimeMillis()); - } - - public static Collection<Mutation> serializeSchema() + public static String getDatacenter() { - Map<DecoratedKey, Mutation> mutationMap = new HashMap<>(); + String req = "SELECT data_center FROM system.%s WHERE key='%s'"; + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); - for (String cf : allSchemaCfs) - serializeSchema(mutationMap, cf); + // Look up the Data center (return it if found) + if (!result.isEmpty() && result.one().has("data_center")) + return result.one().getString("data_center"); - return mutationMap.values(); - } - - private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName) - { - for (Row schemaRow : serializedSchema(schemaCfName)) - { - if (Schema.ignoredSchemaRow(schemaRow)) - continue; - - Mutation mutation = mutationMap.get(schemaRow.key); - if (mutation == null) - { - mutation = new Mutation(Keyspace.SYSTEM_KS, schemaRow.key.getKey()); - mutationMap.put(schemaRow.key, mutation); - } - - mutation.add(schemaRow.cf); - } - } - - public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces) - { - Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); - - for (String keyspace : keyspaces) - { - Row schemaEntity = readSchemaRow(schemaCfName, keyspace); - if (schemaEntity.cf != null) - schema.put(schemaEntity.key, schemaEntity.cf); - } - - return schema; - } - - public static ByteBuffer getSchemaKSKey(String ksName) - { - return AsciiType.instance.fromString(ksName); - } - - /** - * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace. - * - * @param schemaCfName the schema table to get the data from (schema_keyspaces, schema_columnfamilies, schema_columns or schema_triggers) - * @param ksName the keyspace of the tables we are interested in - * @return a Row containing the schema data of a particular type for the keyspace - */ - public static Row readSchemaRow(String schemaCfName, String ksName) - { - DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); - - ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName); - ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, schemaCfName, System.currentTimeMillis())); - - return new Row(key, result); - } - - /** - * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair. - * - * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers) - * @param ksName the keyspace of the table we are interested in - * @param cfName the table we are interested in - * @return a Row containing the schema data of a particular type for the table - */ - public static Row readSchemaRow(String schemaCfName, String ksName, String cfName) - { - DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); - ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName); - Composite prefix = schemaCFS.getComparator().make(cfName); - ColumnFamily cf = schemaCFS.getColumnFamily(key, - prefix, - prefix.end(), - false, - Integer.MAX_VALUE, - System.currentTimeMillis()); - return new Row(key, cf); + return null; } - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) + public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId); - UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS_CF), key, metadata.cfId); ++ UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS), key, metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ----------------------------------------------------------------------