Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d00fcbc4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d00fcbc4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d00fcbc4 Branch: refs/heads/cassandra-3.0 Commit: d00fcbc43980528990e2de4423d0fd8d8d5e8b95 Parents: fb463c7 9fc957c Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Nov 10 16:32:44 2015 +0000 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Nov 10 16:32:44 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d00fcbc4/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b8a65fd,0557786..ef21f9f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,48 -1,6 +1,49 @@@ -2.2.4 +3.0.1 + * Updated trigger example (CASSANDRA-10257) +Merged from 2.2: + * (Hadoop) fix splits calculation (CASSANDRA-10640) * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) +Merged from 2.1: + * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258) + * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605) + + +3.0 + * Fix AssertionError while flushing memtable due to materialized views + incorrectly inserting empty rows (CASSANDRA-10614) + * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650) + * Don't use -1 for the position of partition key in schema (CASSANDRA-10491) + * Fix distinct queries in mixed version cluster (CASSANDRA-10573) + * Skip sstable on clustering in names query (CASSANDRA-10571) + * Remove value skipping as it breaks read-repair (CASSANDRA-10655) + * Fix bootstrapping with MVs (CASSANDRA-10621) + * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584) + * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634) + * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600) + * Fix reading of legacy sstables (CASSANDRA-10590) + * Use CQL type names in schema metadata tables (CASSANDRA-10365) + * Guard batchlog replay against integer division by zero (CASSANDRA-9223) + * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608) + * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068) + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602) + * Don't use 'names query' read path for counters (CASSANDRA-10572) + * Fix backward compatibility for counters (CASSANDRA-10470) + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628) + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604) + * Fix thrift cas operations with defined columns (CASSANDRA-10576) + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606) + * Fix thrift get() queries with defined columns (CASSANDRA-10586) + * Fix marking of indexes as built and removed (CASSANDRA-10601) + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595) + * Fix batches on multiple tables (CASSANDRA-10554) + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569) + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975) + * Remove token generator (CASSANDRA-5261) + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562) + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421) + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360) + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367) +Merged from 2.2: * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) * Use most up-to-date version of schema for system tables (CASSANDRA-10652) * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d00fcbc4/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index 1b11391,36da92d..6856175 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@@ -98,248 -78,4 +98,248 @@@ public class CqlInputFormat extends org return new CqlRecordReader(); } + protected void validateConfiguration(Configuration conf) + { + if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()"); + } + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress"); + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner"); + } + + public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext context) throws IOException + { + Configuration conf = HadoopCompat.getConfiguration(context); + + validateConfiguration(conf); + + keyspace = ConfigHelper.getInputKeyspace(conf); + cfName = ConfigHelper.getInputColumnFamily(conf); + partitioner = ConfigHelper.getInputPartitioner(conf); + logger.trace("partitioner is {}", partitioner); + + // canonical ranges and nodes holding replicas + Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace); + + // canonical ranges, split into pieces, fetching the splits in parallel + ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); + + try + { + List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>(); + KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); + Range<Token> jobRange = null; + if (jobKeyRange != null) + { + if (jobKeyRange.start_key != null) + { + if (!partitioner.preservesOrder()) + throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); + if (jobKeyRange.start_token != null) + throw new IllegalArgumentException("only start_key supported"); + if (jobKeyRange.end_token != null) + throw new IllegalArgumentException("only start_key supported"); + jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), + partitioner.getToken(jobKeyRange.end_key)); + } + else if (jobKeyRange.start_token != null) + { + jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), + partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); + } + else + { + logger.warn("ignoring jobKeyRange specified without start_key or start_token"); + } + } + + session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); + Metadata metadata = session.getCluster().getMetadata(); + + for (TokenRange range : masterRangeNodes.keySet()) + { + if (jobRange == null) + { + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); + } + else + { + TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange); + if (range.intersects(jobTokenRange)) + { + for (TokenRange intersection: range.intersectWith(jobTokenRange)) + { + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); + } + } + } + } + + // wait until we have all the results back + for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures) + { + try + { + splits.addAll(futureInputSplits.get()); + } + catch (Exception e) + { + throw new IOException("Could not get input splits", e); + } + } + } + finally + { + executor.shutdownNow(); + } + + assert splits.size() > 0; + Collections.shuffle(splits, new Random(System.nanoTime())); + return splits; + } + + private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) + { + return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), + metadata.newToken(partitioner.getTokenFactory().toString(range.right))); + } + + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + { + int splitSize = ConfigHelper.getInputSplitSize(conf); + try + { + return describeSplits(keyspace, cfName, range, splitSize); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) + { + try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) + { + Map<TokenRange, Set<Host>> map = new HashMap<>(); + Metadata metadata = session.getCluster().getMetadata(); + for (TokenRange tokenRange : metadata.getTokenRanges()) + map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); + return map; + } + } + + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) + { + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", + SystemKeyspace.NAME, + SystemKeyspace.SIZE_ESTIMATES); + + ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); + + Row row = resultSet.one(); + + long meanPartitionSize = 0; + long partitionCount = 0; + int splitCount = 0; + + if (row != null) + { + meanPartitionSize = row.getLong("mean_partition_size"); + partitionCount = row.getLong("partitions_count"); + + splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); + } + + // If we have no data on this split or the size estimate is 0, + // return the full split i.e., do not sub-split + // Assume smallest granularity of partition count available from CASSANDRA-7688 + if (splitCount == 0) + { + Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); + wrappedTokenRange.put(tokenRange, (long) 128); + return wrappedTokenRange; + } + + List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); + Map<TokenRange, Long> rangesWithLength = new HashMap<>(); + for (TokenRange range : splitRanges) + rangesWithLength.put(range, partitionCount/splitCount); + + return rangesWithLength; + } + + // Old Hadoop API + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException + { + TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); + List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac); + InputSplit[] oldInputSplits = new InputSplit[newInputSplits.size()]; + for (int i = 0; i < newInputSplits.size(); i++) + oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i); + return oldInputSplits; + } + + /** + * Gets a token tokenRange and splits it up according to the suggested + * size into input splits that Hadoop can use. + */ + class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>> + { + + private final TokenRange tokenRange; + private final Set<Host> hosts; + private final Configuration conf; + + public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf) + { + this.tokenRange = tr; + this.hosts = hosts; + this.conf = conf; + } + + public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception + { + ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); + Map<TokenRange, Long> subSplits; + subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); + // turn the sub-ranges into InputSplits + String[] endpoints = new String[hosts.size()]; + + // hadoop needs hostname, not ip + int endpointIndex = 0; + for (Host endpoint : hosts) + endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); + + boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; + + for (TokenRange subSplit : subSplits.keySet()) + { + List<TokenRange> ranges = subSplit.unwrap(); + for (TokenRange subrange : ranges) + { + ColumnFamilySplit split = + new ColumnFamilySplit( + partitionerIsOpp ? + subrange.getStart().toString().substring(2) : subrange.getStart().toString(), + partitionerIsOpp ? - subrange.getEnd().toString().substring(2) : subrange.getStart().toString(), ++ subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(), + subSplits.get(subSplit), + endpoints); + + logger.trace("adding {}", split); + splits.add(split); + } + } + return splits; + } + } }