http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java index be4e5aa..8227239 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java @@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory; public class Constants { - public static final String VERSION = "19.26.0"; + public static final String VERSION = "19.27.0"; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 46b767a..e315ced 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -30,6 +30,9 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Iterables; +import org.apache.commons.lang.ArrayUtils; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.ConfigurationException; @@ -41,7 +44,6 @@ import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -61,7 +63,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private SlicePredicate predicate; private boolean isEmptyPredicate; private int totalRowCount; // total number of rows to fetch - private int batchRowCount; // fetch this many per batch + private int batchSize; // fetch this many per batch private String cfName; private String keyspace; private TSocket socket; @@ -69,6 +71,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private ConsistencyLevel consistencyLevel; private int keyBufferSize = 8192; private List<IndexExpression> filter; + private boolean widerows; public ColumnFamilyRecordReader() { @@ -103,6 +106,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap public float getProgress() { + // TODO this is totally broken for wide rows // the progress is likely to be reported slightly off the actual but close enough return ((float)iter.rowsRead()) / totalRowCount; } @@ -135,9 +139,10 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap KeyRange jobRange = ConfigHelper.getInputKeyRange(conf); filter = jobRange == null ? null : jobRange.row_filter; predicate = ConfigHelper.getInputSlicePredicate(conf); + widerows = ConfigHelper.getInputIsWide(conf); isEmptyPredicate = isEmptyPredicate(predicate); totalRowCount = ConfigHelper.getInputSplitSize(conf); - batchRowCount = ConfigHelper.getRangeBatchSize(conf); + batchSize = ConfigHelper.getRangeBatchSize(conf); cfName = ConfigHelper.getInputColumnFamily(conf); consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf)); @@ -173,7 +178,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap throw new RuntimeException(e); } - iter = new RowIterator(); + iter = widerows ? new WideRowIterator() : new StaticRowIterator(); } public boolean nextKeyValue() throws IOException @@ -222,15 +227,15 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return split.getLocations()[0]; } - private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> + private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> { - private List<KeySlice> rows; - private String startToken; - private int totalRead = 0; - private int i = 0; - private final AbstractType<?> comparator; - private final AbstractType<?> subComparator; - private final IPartitioner partitioner; + protected List<KeySlice> rows; + protected KeySlice lastRow; + protected int totalRead = 0; + protected int i = 0; + protected final AbstractType<?> comparator; + protected final AbstractType<?> subComparator; + protected final IPartitioner partitioner; private RowIterator() { @@ -264,64 +269,115 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } + /** + * @return total number of rows read by this record reader + */ + public int rowsRead() + { + return totalRead; + } + + protected IColumn unthriftify(ColumnOrSuperColumn cosc) + { + if (cosc.counter_column != null) + return unthriftifyCounter(cosc.counter_column); + if (cosc.counter_super_column != null) + return unthriftifySuperCounter(cosc.counter_super_column); + if (cosc.super_column != null) + return unthriftifySuper(cosc.super_column); + assert cosc.column != null; + return unthriftifySimple(cosc.column); + } + + private IColumn unthriftifySuper(SuperColumn super_column) + { + org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator); + for (Column column : super_column.columns) + { + sc.addColumn(unthriftifySimple(column)); + } + return sc; + } + + private IColumn unthriftifySimple(Column column) + { + return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); + } + + private IColumn unthriftifyCounter(CounterColumn column) + { + //CounterColumns read the nodeID from the System table, so need the StorageService running and access + //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column. + return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value), 0); + } + + private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn) + { + org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator); + for (CounterColumn column : superColumn.columns) + sc.addColumn(unthriftifyCounter(column)); + return sc; + } + } + + private class StaticRowIterator extends RowIterator + { private void maybeInit() { - // check if we need another batch + // check if we need another batch if (rows != null && i >= rows.size()) rows = null; - + if (rows != null) return; - if (startToken == null) + String startToken; + if (lastRow == null) { startToken = split.getStartToken(); - } - else if (startToken.equals(split.getEndToken())) + } + else { - // reached end of the split - rows = null; - return; + startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key)); + if (startToken.equals(split.getEndToken())) + { + // reached end of the split + rows = null; + return; + } } - - KeyRange keyRange = new KeyRange(batchRowCount) + + KeyRange keyRange = new KeyRange(batchSize) .setStart_token(startToken) .setEnd_token(split.getEndToken()) .setRow_filter(filter); try { - rows = client.get_range_slices(new ColumnParent(cfName), - predicate, - keyRange, - consistencyLevel); - + rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel); + // nothing new? reached the end if (rows.isEmpty()) { rows = null; return; } - + // prepare for the next slice to be read - KeySlice lastRow = rows.get(rows.size() - 1); - ByteBuffer rowkey = lastRow.key; - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rowkey)); - + lastRow = Iterables.getLast(rows); + // remove ghosts when fetching all columns if (isEmptyPredicate) { Iterator<KeySlice> it = rows.iterator(); - - while(it.hasNext()) + while (it.hasNext()) { KeySlice ks = it.next(); - if (ks.getColumnsSize() == 0) { it.remove(); } } - + // all ghosts, spooky if (rows.isEmpty()) { @@ -329,9 +385,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return; } } - + // reset to iterate through this new batch - i = 0; + i = 0; } catch (Exception e) { @@ -339,20 +395,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - /** - * @return total number of rows read by this record reader - */ - public int rowsRead() - { - return totalRead; - } - protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext() { maybeInit(); if (rows == null) return endOfData(); - + totalRead++; KeySlice ks = rows.get(i++); SortedMap<ByteBuffer, IColumn> map = new TreeMap<ByteBuffer, IColumn>(comparator); @@ -363,51 +411,88 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } return new Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>(ks.key, map); } + } - private IColumn unthriftify(ColumnOrSuperColumn cosc) - { - if (cosc.counter_column != null) - return unthriftifyCounter(cosc.counter_column); - if (cosc.counter_super_column != null) - return unthriftifySuperCounter(cosc.counter_super_column); - if (cosc.super_column != null) - return unthriftifySuper(cosc.super_column); - assert cosc.column != null; - return unthriftifySimple(cosc.column); - } + private class WideRowIterator extends RowIterator + { + private Iterator<ColumnOrSuperColumn> wideColumns; - private IColumn unthriftifySuper(SuperColumn super_column) + private void maybeInit() { - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator); - for (Column column : super_column.columns) + if (wideColumns != null && wideColumns.hasNext()) + return; + + // check if we need another batch + if (rows != null && ++i >= rows.size()) + rows = null; + + if (rows != null) { - sc.addColumn(unthriftifySimple(column)); + wideColumns = rows.get(i).columns.iterator(); + return; } - return sc; - } - private IColumn unthriftifySimple(Column column) - { - return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); - } + String startToken; + ByteBuffer startColumn; + if (lastRow == null) + { + startToken = split.getStartToken(); + startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; + } + else + { + startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key)); + startColumn = Iterables.getLast(lastRow.columns).column.name; + } - private IColumn unthriftifyCounter(CounterColumn column) - { - //CounterColumns read the nodeID from the System table, so need the StorageService running and access - //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column. - return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value), 0); + KeyRange keyRange = new KeyRange(batchSize) + .setStart_token(startToken) + .setEnd_token(split.getEndToken()) + .setRow_filter(filter); + try + { + rows = client.get_paged_slice(cfName, keyRange, startColumn, consistencyLevel); + + // nothing found? + if (rows == null || rows.isEmpty() || rows.get(0).columns.isEmpty()) + { + rows = null; + return; + } + + // nothing new? reached the end + if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || rows.get(0).columns.get(0).column.equals(startColumn))) + { + rows = null; + return; + } + + // prepare for the next slice to be read + lastRow = Iterables.getLast(rows); + + // reset to iterate through this new batch + i = 0; + wideColumns = rows.get(i).columns.iterator(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } - private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn) + protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext() { - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator); - for (CounterColumn column : superColumn.columns) - sc.addColumn(unthriftifyCounter(column)); - return sc; + maybeInit(); + if (rows == null) + return endOfData(); + + totalRead++; + ColumnOrSuperColumn cosc = wideColumns.next(); + ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(cosc.column.name, unthriftify(cosc)); + return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(rows.get(i).key, map); } } - // Because the old Hadoop API wants us to write to the key and value // and the new asks for them, we need to copy the output of the new API // to the old. Thus, expect a small performance hit. http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 810ac80..c83fbc5 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -19,10 +19,14 @@ package org.apache.cassandra.hadoop; * under the License. * */ + import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.thrift.*; @@ -37,8 +41,6 @@ import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ConfigHelper @@ -56,6 +58,7 @@ public class ConfigHelper private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; + private static final String INPUT_WIDEROWS_CONFIG = "cassandra.input.widerows"; private static final int DEFAULT_SPLIT_SIZE = 64 * 1024; private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size"; private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; @@ -71,13 +74,13 @@ public class ConfigHelper /** * Set the keyspace and column family for the input of this job. - * Comparator and Partitioner types will be read from storage-conf.xml. * * @param conf Job configuration you are about to run * @param keyspace * @param columnFamily + * @param widerows */ - public static void setInputColumnFamily(Configuration conf, String keyspace, String columnFamily) + public static void setInputColumnFamily(Configuration conf, String keyspace, String columnFamily, boolean widerows) { if (keyspace == null) { @@ -90,6 +93,19 @@ public class ConfigHelper conf.set(INPUT_KEYSPACE_CONFIG, keyspace); conf.set(INPUT_COLUMNFAMILY_CONFIG, columnFamily); + conf.set(INPUT_WIDEROWS_CONFIG, String.valueOf(widerows)); + } + + /** + * Set the keyspace and column family for the input of this job. + * + * @param conf Job configuration you are about to run + * @param keyspace + * @param columnFamily + */ + public static void setInputColumnFamily(Configuration conf, String keyspace, String columnFamily) + { + setInputColumnFamily(conf, keyspace, columnFamily, false); } /** @@ -175,7 +191,7 @@ public class ConfigHelper { return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG)); } - + public static String getRawInputSlicePredicate(Configuration conf) { return conf.get(INPUT_PREDICATE_CONFIG); @@ -299,6 +315,11 @@ public class ConfigHelper { return conf.get(INPUT_COLUMNFAMILY_CONFIG); } + + public static boolean getInputIsWide(Configuration conf) + { + return Boolean.valueOf(conf.get(INPUT_WIDEROWS_CONFIG)); + } public static String getOutputColumnFamily(Configuration conf) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index b84c3ce..8aacea1 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -711,6 +711,61 @@ public class CassandraServer implements Cassandra.Iface return thriftifyKeySlices(rows, column_parent, predicate); } + public List<KeySlice> get_paged_slice(String column_family, KeyRange range, ByteBuffer start_column, ConsistencyLevel consistency_level) + throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + logger.debug("get_paged_slice"); + + String keyspace = state().getKeyspace(); + state().hasColumnFamilyAccess(column_family, Permission.READ); + + CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family); + ThriftValidation.validateKeyRange(metadata, null, range); + ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ); + + SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1)); + + IPartitioner p = StorageService.getPartitioner(); + AbstractBounds<RowPosition> bounds; + if (range.start_key == null) + { + Token.TokenFactory tokenFactory = p.getTokenFactory(); + Token left = tokenFactory.fromString(range.start_token); + Token right = tokenFactory.fromString(range.end_token); + bounds = Range.makeRowRange(left, right, p); + } + else + { + bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p)); + } + + List<Row> rows; + try + { + schedule(DatabaseDescriptor.getRpcTimeout()); + try + { + rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, predicate, bounds, range.row_filter, range.count, true), consistency_level); + } + finally + { + release(); + } + assert rows != null; + } + catch (TimeoutException e) + { + logger.debug("... timed out"); + throw new TimedOutException(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate); + } + private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate) { List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());