Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e42164b6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e42164b6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e42164b6 Branch: refs/heads/cassandra-3.0 Commit: e42164b63baf7c86ac64078f68e61097c4741711 Parents: 1e64a9d 177f607 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Nov 10 14:39:16 2015 +0000 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Nov 10 14:39:16 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/hadoop/cql3/CqlRecordWriter.java | 94 ++++++++++---------- 2 files changed, 50 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e42164b6/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 33adefb,81ceb25..24e42c0 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,45 -1,5 +1,47 @@@ -2.2.4 +3.0.1 ++Merged from 2.2: + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) +Merged from 2.1: + * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258) + * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605) + + +3.0 + * Fix AssertionError while flushing memtable due to materialized views + incorrectly inserting empty rows (CASSANDRA-10614) + * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650) + * Don't use -1 for the position of partition key in schema (CASSANDRA-10491) + * Fix distinct queries in mixed version cluster (CASSANDRA-10573) + * Skip sstable on clustering in names query (CASSANDRA-10571) + * Remove value skipping as it breaks read-repair (CASSANDRA-10655) + * Fix bootstrapping with MVs (CASSANDRA-10621) + * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584) + * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634) + * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600) + * Fix reading of legacy sstables (CASSANDRA-10590) + * Use CQL type names in schema metadata tables (CASSANDRA-10365) + * Guard batchlog replay against integer division by zero (CASSANDRA-9223) + * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608) + * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068) + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602) + * Don't use 'names query' read path for counters (CASSANDRA-10572) + * Fix backward compatibility for counters (CASSANDRA-10470) + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628) + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604) + * Fix thrift cas operations with defined columns (CASSANDRA-10576) + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606) + * Fix thrift get() queries with defined columns (CASSANDRA-10586) + * Fix marking of indexes as built and removed (CASSANDRA-10601) + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595) + * Fix batches on multiple tables (CASSANDRA-10554) + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569) + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975) + * Remove token generator (CASSANDRA-5261) + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562) + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421) + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360) + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367) +Merged from 2.2: * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) * Use most up-to-date version of schema for system tables (CASSANDRA-10652) * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e42164b6/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 6b4caa5,14e24fb..23beba3 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@@ -108,31 -109,29 +108,29 @@@ class CqlRecordWriter extends RecordWri CqlRecordWriter(Configuration conf) { this.conf = conf; - this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); - batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); + this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); + batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32); this.clients = new HashMap<>(); - try + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) { String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace)) + Session client = cluster.connect(keyspace); + ringCache = new NativeRingCache(conf); + if (client != null) { - ringCache = new NativeRingCache(conf); - if (client != null) - { - TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf)); - clusterColumns = tableMetadata.getClusteringColumns(); - partitionKeyColumns = tableMetadata.getPartitionKey(); - - String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); - if (cqlQuery.toLowerCase().startsWith("insert")) - throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); - cql = appendKeyWhereClauses(cqlQuery); - } - else - { - throw new IllegalArgumentException("Invalid configuration specified " + conf); - } + TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf)); + clusterColumns = tableMetadata.getClusteringColumns(); + partitionKeyColumns = tableMetadata.getPartitionKey(); + + String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); + if (cqlQuery.toLowerCase().startsWith("insert")) + throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); + cql = appendKeyWhereClauses(cqlQuery); + } + else + { + throw new IllegalArgumentException("Invalid configuration specified " + conf); } } catch (Exception e) @@@ -298,34 -298,37 +297,37 @@@ while (true) { // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. - try + if (session != null) { - int i = 0; - PreparedStatement statement = preparedStatement(client); - while (bindVariables != null) + try { - BoundStatement boundStatement = new BoundStatement(statement); - for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + int i = 0; + PreparedStatement statement = preparedStatement(session); + while (bindVariables != null) { - boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + BoundStatement boundStatement = new BoundStatement(statement); + for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + { + boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + } + session.execute(boundStatement); + i++; + + if (i >= batchThreshold) + break; + bindVariables = queue.poll(); } - client.execute(boundStatement); - i++; - - if (i >= batchThreshold) - break; - bindVariables = queue.poll(); + break; } - break; - } - catch (Exception e) - { - closeInternal(); - if (!iter.hasNext()) + catch (Exception e) { - lastException = new IOException(e); - break outer; + closeInternal(); + if (!iter.hasNext()) + { + lastException = new IOException(e); + break outer; + } - } + } } // attempt to connect to a different endpoint