update get_paged_slice to allow starting with a key; fixes for WideRowIterator patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3883
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97aa922a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97aa922a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97aa922a Branch: refs/heads/cassandra-1.1.0 Commit: 97aa922a7476dce06121ae289877abccf161afae Parents: dbc0f59 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Apr 10 16:06:39 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Apr 11 13:24:56 2012 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 25 ++- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 136 ++++++++------ .../apache/cassandra/thrift/CassandraServer.java | 5 +- .../apache/cassandra/thrift/ThriftValidation.java | 19 +-- 4 files changed, 108 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index ef56678..354903d 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -34,6 +34,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.collect.ImmutableList; + import org.apache.cassandra.db.IColumn; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -87,6 +89,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B private String keyspace; private String cfName; + private IPartitioner partitioner; private static void validateConfiguration(Configuration conf) { @@ -100,8 +103,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B } if (ConfigHelper.getInputInitialAddress(conf) == null) throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); - - // input partitioner is optional -- used only if requesting an ordered key scan + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the Cassandra partitioner class"); } public List<InputSplit> getSplits(JobContext context) throws IOException @@ -115,6 +118,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration()); cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration()); + partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); + logger.debug("partitioner is " + partitioner); // cannonical ranges, split into pieces, fetching the splits in parallel ExecutorService executor = Executors.newCachedThreadPool(); @@ -124,11 +129,9 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B { List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>(); KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); - IPartitioner partitioner = null; Range<Token> jobRange = null; if (jobKeyRange != null && jobKeyRange.start_token != null) { - partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner"; assert jobKeyRange.start_key == null : "only start_token supported"; assert jobKeyRange.end_key == null : "only end_token supported"; @@ -219,11 +222,19 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName(); } + Token.TokenFactory factory = partitioner.getTokenFactory(); for (int i = 1; i < tokens.size(); i++) { - ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints); - logger.debug("adding " + split); - splits.add(split); + Token left = factory.fromString(tokens.get(i - 1)); + Token right = factory.fromString(tokens.get(i)); + Range<Token> range = new Range<Token>(left, right, partitioner); + List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range); + for (Range<Token> subrange : ranges) + { + ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left), factory.toString(subrange.right), endpoints); + logger.debug("adding " + split); + splits.add(split); + } } return splits; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 483c040..600cf13 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -29,10 +29,9 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Iterables; -import org.apache.commons.lang.ArrayUtils; +import com.google.common.collect.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.ConfigurationException; @@ -55,6 +54,8 @@ import org.apache.thrift.transport.TSocket; public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> { + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class); + public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; private ColumnFamilySplit split; @@ -179,6 +180,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } iter = widerows ? new WideRowIterator() : new StaticRowIterator(); + logger.debug("created {}", iter); } public boolean nextKeyValue() throws IOException @@ -230,9 +232,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> { protected List<KeySlice> rows; - protected KeySlice lastRow; protected int totalRead = 0; - protected int i = 0; protected final AbstractType<?> comparator; protected final AbstractType<?> subComparator; protected final IPartitioner partitioner; @@ -299,7 +299,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return sc; } - private IColumn unthriftifySimple(Column column) + protected IColumn unthriftifySimple(Column column) { return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); } @@ -322,23 +322,23 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private class StaticRowIterator extends RowIterator { + protected int i = 0; + private void maybeInit() { // check if we need another batch - if (rows != null && i >= rows.size()) - rows = null; - - if (rows != null) + if (rows != null && i < rows.size()) return; String startToken; - if (lastRow == null) + if (totalRead == 0) { + // first request startToken = split.getStartToken(); } else { - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key)); + startToken = partitioner.getTokenFactory().toString(partitioner.getToken(Iterables.getLast(rows).key)); if (startToken.equals(split.getEndToken())) { // reached end of the split @@ -362,9 +362,6 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return; } - // prepare for the next slice to be read - lastRow = Iterables.getLast(rows); - // remove ghosts when fetching all columns if (isEmptyPredicate) { @@ -415,64 +412,49 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private class WideRowIterator extends RowIterator { - private Iterator<ColumnOrSuperColumn> wideColumns; + private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> wideColumns; private void maybeInit() { if (wideColumns != null && wideColumns.hasNext()) return; - // check if we need another batch - if (rows != null && ++i >= rows.size()) - rows = null; - - if (rows != null) - { - wideColumns = rows.get(i).columns.iterator(); - return; - } - - String startToken; + KeyRange keyRange; ByteBuffer startColumn; - if (lastRow == null) + if (totalRead == 0) { - startToken = split.getStartToken(); + String startToken = split.getStartToken(); + keyRange = new KeyRange(batchSize) + .setStart_token(startToken) + .setEnd_token(split.getEndToken()) + .setRow_filter(filter); startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; } else { - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key)); + KeySlice lastRow = Iterables.getLast(rows); + logger.debug("Starting with last-seen row {}", lastRow.key); + keyRange = new KeyRange(batchSize) + .setStart_key(lastRow.key) + .setEnd_token(split.getEndToken()) + .setRow_filter(filter); startColumn = Iterables.getLast(lastRow.columns).column.name; } - KeyRange keyRange = new KeyRange(batchSize) - .setStart_token(startToken) - .setEnd_token(split.getEndToken()) - .setRow_filter(filter); try { rows = client.get_paged_slice(cfName, keyRange, startColumn, consistencyLevel); - - // nothing found? - if (rows == null || rows.isEmpty() || rows.get(0).columns.isEmpty()) - { - rows = null; - return; - } - - // nothing new? reached the end - if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || rows.get(0).columns.get(0).column.name.equals(startColumn))) - { + int n = 0; + for (KeySlice row : rows) + n += row.columns.size(); + logger.debug("read {} columns in {} rows for {} starting with {}", + new Object[]{ n, rows.size(), keyRange, startColumn }); + + wideColumns = Iterators.peekingIterator(new WideColumnIterator(rows)); + if (wideColumns.hasNext() && wideColumns.peek().right.keySet().iterator().next().equals(startColumn)) + wideColumns.next(); + if (!wideColumns.hasNext()) rows = null; - return; - } - - // prepare for the next slice to be read - lastRow = Iterables.getLast(rows); - - // reset to iterate through this new batch - i = 0; - wideColumns = rows.get(i).columns.iterator(); } catch (Exception e) { @@ -487,9 +469,47 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return endOfData(); totalRead++; - ColumnOrSuperColumn cosc = wideColumns.next(); - ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftify(cosc)); - return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(rows.get(i).key, map); + return wideColumns.next(); + } + + private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> + { + private final Iterator<KeySlice> rows; + private Iterator<ColumnOrSuperColumn> columns; + public KeySlice currentRow; + + public WideColumnIterator(List<KeySlice> rows) + { + this.rows = rows.iterator(); + if (this.rows.hasNext()) + nextRow(); + else + columns = Iterators.emptyIterator(); + } + + private void nextRow() + { + currentRow = rows.next(); + columns = currentRow.columns.iterator(); + } + + protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext() + { + while (true) + { + if (columns.hasNext()) + { + ColumnOrSuperColumn cosc = columns.next(); + ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftifySimple(cosc.column)); + return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(currentRow.key, map); + } + + if (!rows.hasNext()) + return endOfData(); + + nextRow(); + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 7aceb0e..7cb77d7 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -733,6 +733,7 @@ public class CassandraServer implements Cassandra.Iface AbstractBounds<RowPosition> bounds; if (range.start_key == null) { + // (token, key) is unsupported, assume (token, token) Token.TokenFactory tokenFactory = p.getTokenFactory(); Token left = tokenFactory.fromString(range.start_token); Token right = tokenFactory.fromString(range.end_token); @@ -740,7 +741,9 @@ public class CassandraServer implements Cassandra.Iface } else { - bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p)); + RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound(p) + : RowPosition.forKey(range.end_key, p); + bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end); } List<Row> rows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/ThriftValidation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java index 25c751c..a77bfb6 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java +++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java @@ -479,20 +479,17 @@ public class ThriftValidation public static void validateKeyRange(CFMetaData metadata, ByteBuffer superColumn, KeyRange range) throws InvalidRequestException { - if ((range.start_key == null) != (range.end_key == null)) - { - throw new InvalidRequestException("start key and end key must either both be non-null, or both be null"); - } - if ((range.start_token == null) != (range.end_token == null)) - { - throw new InvalidRequestException("start token and end token must either both be non-null, or both be null"); - } - if ((range.start_key == null) == (range.start_token == null)) + if ((range.start_key == null) == (range.start_token == null) + || (range.end_key == null) == (range.end_token == null)) { throw new InvalidRequestException("exactly one of {start key, end key} or {start token, end token} must be specified"); } - if (range.start_key != null) + // (key, token) is supported (for wide-row CFRR) but not (token, key) + if (range.start_token != null && range.end_key != null) + throw new InvalidRequestException("start token + end key is not a supported key range"); + + if (range.start_key != null && range.end_key != null) { IPartitioner p = StorageService.getPartitioner(); Token startToken = p.getToken(range.start_key); @@ -510,7 +507,7 @@ public class ThriftValidation if (!isEmpty(range.row_filter) && superColumn != null) { - throw new InvalidRequestException("super columns are not yet supported for indexing"); + throw new InvalidRequestException("super columns are not supported for indexing"); } if (range.count <= 0)