http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index be4e5aa..8227239 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "19.26.0";
+  public static final String VERSION = "19.27.0";
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 46b767a..e315ced 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -30,6 +30,9 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.ConfigurationException;
@@ -41,7 +44,6 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -61,7 +63,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
     private SlicePredicate predicate;
     private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
-    private int batchRowCount; // fetch this many per batch
+    private int batchSize; // fetch this many per batch
     private String cfName;
     private String keyspace;
     private TSocket socket;
@@ -69,6 +71,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
     private ConsistencyLevel consistencyLevel;
     private int keyBufferSize = 8192;
     private List<IndexExpression> filter;
+    private boolean widerows;
 
     public ColumnFamilyRecordReader()
     {
@@ -103,6 +106,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
     
     public float getProgress()
     {
+        // TODO this is totally broken for wide rows
         // the progress is likely to be reported slightly off the actual but 
close enough
         return ((float)iter.rowsRead()) / totalRowCount;
     }
@@ -135,9 +139,10 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
         KeyRange jobRange = ConfigHelper.getInputKeyRange(conf);
         filter = jobRange == null ? null : jobRange.row_filter;
         predicate = ConfigHelper.getInputSlicePredicate(conf);
+        widerows = ConfigHelper.getInputIsWide(conf);
         isEmptyPredicate = isEmptyPredicate(predicate);
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
-        batchRowCount = ConfigHelper.getRangeBatchSize(conf);
+        batchSize = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
         consistencyLevel = 
ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
         
@@ -173,7 +178,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             throw new RuntimeException(e);
         }
 
-        iter = new RowIterator();
+        iter = widerows ? new WideRowIterator() : new StaticRowIterator();
     }
     
     public boolean nextKeyValue() throws IOException
@@ -222,15 +227,15 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
         return split.getLocations()[0];
     }
 
-    private class RowIterator extends AbstractIterator<Pair<ByteBuffer, 
SortedMap<ByteBuffer, IColumn>>>
+    private abstract class RowIterator extends 
AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
     {
-        private List<KeySlice> rows;
-        private String startToken;
-        private int totalRead = 0;
-        private int i = 0;
-        private final AbstractType<?> comparator;
-        private final AbstractType<?> subComparator;
-        private final IPartitioner partitioner;
+        protected List<KeySlice> rows;
+        protected KeySlice lastRow;
+        protected int totalRead = 0;
+        protected int i = 0;
+        protected final AbstractType<?> comparator;
+        protected final AbstractType<?> subComparator;
+        protected final IPartitioner partitioner;
 
         private RowIterator()
         {
@@ -264,64 +269,115 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             }
         }
 
+        /**
+         * @return total number of rows read by this record reader
+         */
+        public int rowsRead()
+        {
+            return totalRead;
+        }
+
+        protected IColumn unthriftify(ColumnOrSuperColumn cosc)
+        {
+            if (cosc.counter_column != null)
+                return unthriftifyCounter(cosc.counter_column);
+            if (cosc.counter_super_column != null)
+                return unthriftifySuperCounter(cosc.counter_super_column);
+            if (cosc.super_column != null)
+                return unthriftifySuper(cosc.super_column);
+            assert cosc.column != null;
+            return unthriftifySimple(cosc.column);
+        }
+
+        private IColumn unthriftifySuper(SuperColumn super_column)
+        {
+            org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
+            for (Column column : super_column.columns)
+            {
+                sc.addColumn(unthriftifySimple(column));
+            }
+            return sc;
+        }
+
+        private IColumn unthriftifySimple(Column column)
+        {
+            return new org.apache.cassandra.db.Column(column.name, 
column.value, column.timestamp);
+        }
+
+        private IColumn unthriftifyCounter(CounterColumn column)
+        {
+            //CounterColumns read the nodeID from the System table, so need 
the StorageService running and access
+            //to cassandra.yaml. To avoid a Hadoop needing access to yaml 
return a regular Column.
+            return new org.apache.cassandra.db.Column(column.name, 
ByteBufferUtil.bytes(column.value), 0);
+        }
+
+        private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn)
+        {
+            org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator);
+            for (CounterColumn column : superColumn.columns)
+                sc.addColumn(unthriftifyCounter(column));
+            return sc;
+        }
+    }
+
+    private class StaticRowIterator extends RowIterator
+    {
         private void maybeInit()
         {
-            // check if we need another batch 
+            // check if we need another batch
             if (rows != null && i >= rows.size())
                 rows = null;
-            
+
             if (rows != null)
                 return;
 
-            if (startToken == null)
+            String startToken;
+            if (lastRow == null)
             {
                 startToken = split.getStartToken();
-            } 
-            else if (startToken.equals(split.getEndToken()))
+            }
+            else
             {
-                // reached end of the split
-                rows = null;
-                return;
+                startToken = 
partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key));
+                if (startToken.equals(split.getEndToken()))
+                {
+                    // reached end of the split
+                    rows = null;
+                    return;
+                }
             }
-            
-            KeyRange keyRange = new KeyRange(batchRowCount)
+
+            KeyRange keyRange = new KeyRange(batchSize)
                                 .setStart_token(startToken)
                                 .setEnd_token(split.getEndToken())
                                 .setRow_filter(filter);
             try
             {
-                rows = client.get_range_slices(new ColumnParent(cfName),
-                                               predicate,
-                                               keyRange,
-                                               consistencyLevel);
-                  
+                rows = client.get_range_slices(new ColumnParent(cfName), 
predicate, keyRange, consistencyLevel);
+
                 // nothing new? reached the end
                 if (rows.isEmpty())
                 {
                     rows = null;
                     return;
                 }
-                
+
                 // prepare for the next slice to be read
-                KeySlice lastRow = rows.get(rows.size() - 1);
-                ByteBuffer rowkey = lastRow.key;
-                startToken = 
partitioner.getTokenFactory().toString(partitioner.getToken(rowkey));
-                
+                lastRow = Iterables.getLast(rows);
+
                 // remove ghosts when fetching all columns
                 if (isEmptyPredicate)
                 {
                     Iterator<KeySlice> it = rows.iterator();
-                    
-                    while(it.hasNext())
+                    while (it.hasNext())
                     {
                         KeySlice ks = it.next();
-                        
                         if (ks.getColumnsSize() == 0)
                         {
                            it.remove();
                         }
                     }
-                
+
                     // all ghosts, spooky
                     if (rows.isEmpty())
                     {
@@ -329,9 +385,9 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
                         return;
                     }
                 }
-                
+
                 // reset to iterate through this new batch
-                i = 0;             
+                i = 0;
             }
             catch (Exception e)
             {
@@ -339,20 +395,12 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        /**
-         * @return total number of rows read by this record reader
-         */
-        public int rowsRead()
-        {
-            return totalRead;
-        }
-
         protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> 
computeNext()
         {
             maybeInit();
             if (rows == null)
                 return endOfData();
-            
+
             totalRead++;
             KeySlice ks = rows.get(i++);
             SortedMap<ByteBuffer, IColumn> map = new TreeMap<ByteBuffer, 
IColumn>(comparator);
@@ -363,51 +411,88 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             }
             return new Pair<ByteBuffer, SortedMap<ByteBuffer, 
IColumn>>(ks.key, map);
         }
+    }
 
-        private IColumn unthriftify(ColumnOrSuperColumn cosc)
-        {
-            if (cosc.counter_column != null)
-                return unthriftifyCounter(cosc.counter_column);
-            if (cosc.counter_super_column != null)
-                return unthriftifySuperCounter(cosc.counter_super_column);
-            if (cosc.super_column != null)
-                return unthriftifySuper(cosc.super_column);
-            assert cosc.column != null;
-            return unthriftifySimple(cosc.column);
-        }
+    private class WideRowIterator extends RowIterator
+    {
+        private Iterator<ColumnOrSuperColumn> wideColumns;
 
-        private IColumn unthriftifySuper(SuperColumn super_column)
+        private void maybeInit()
         {
-            org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
-            for (Column column : super_column.columns)
+            if (wideColumns != null && wideColumns.hasNext())
+                return;
+
+            // check if we need another batch
+            if (rows != null && ++i >= rows.size())
+                rows = null;
+
+            if (rows != null)
             {
-                sc.addColumn(unthriftifySimple(column));
+                wideColumns = rows.get(i).columns.iterator();
+                return;
             }
-            return sc;
-        }
 
-        private IColumn unthriftifySimple(Column column)
-        {
-            return new org.apache.cassandra.db.Column(column.name, 
column.value, column.timestamp);
-        }
+            String startToken;
+            ByteBuffer startColumn;
+            if (lastRow == null)
+            {
+                startToken = split.getStartToken();
+                startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            }
+            else
+            {
+                startToken = 
partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key));
+                startColumn = Iterables.getLast(lastRow.columns).column.name;
+            }
 
-        private IColumn unthriftifyCounter(CounterColumn column)
-        {
-            //CounterColumns read the nodeID from the System table, so need 
the StorageService running and access
-            //to cassandra.yaml. To avoid a Hadoop needing access to yaml 
return a regular Column.
-            return new org.apache.cassandra.db.Column(column.name, 
ByteBufferUtil.bytes(column.value), 0);
+            KeyRange keyRange = new KeyRange(batchSize)
+                                .setStart_token(startToken)
+                                .setEnd_token(split.getEndToken())
+                                .setRow_filter(filter);
+            try
+            {
+                rows = client.get_paged_slice(cfName, keyRange, startColumn, 
consistencyLevel);
+
+                // nothing found?
+                if (rows == null || rows.isEmpty() || 
rows.get(0).columns.isEmpty())
+                {
+                    rows = null;
+                    return;
+                }
+                    
+                // nothing new? reached the end
+                if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || 
rows.get(0).columns.get(0).column.equals(startColumn)))
+                {
+                    rows = null;
+                    return;
+                }
+
+                // prepare for the next slice to be read
+                lastRow = Iterables.getLast(rows);
+
+                // reset to iterate through this new batch
+                i = 0;
+                wideColumns = rows.get(i).columns.iterator();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
         }
 
-        private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn)
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> 
computeNext()
         {
-            org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator);
-            for (CounterColumn column : superColumn.columns)
-                sc.addColumn(unthriftifyCounter(column));
-            return sc;
+            maybeInit();
+            if (rows == null)
+                return endOfData();
+
+            totalRead++;
+            ColumnOrSuperColumn cosc = wideColumns.next();
+            ImmutableSortedMap<ByteBuffer, IColumn> map = 
ImmutableSortedMap.of(cosc.column.name, unthriftify(cosc));
+            return Pair.<ByteBuffer, SortedMap<ByteBuffer, 
IColumn>>create(rows.get(i).key, map);
         }
     }
 
-
     // Because the old Hadoop API wants us to write to the key and value
     // and the new asks for them, we need to copy the output of the new API
     // to the old. Thus, expect a small performance hit.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 810ac80..c83fbc5 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -19,10 +19,14 @@ package org.apache.cassandra.hadoop;
  * under the License.
  * 
  */
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.*;
@@ -37,8 +41,6 @@ import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class ConfigHelper
@@ -56,6 +58,7 @@ public class ConfigHelper
     private static final String INPUT_PREDICATE_CONFIG = 
"cassandra.input.predicate";
     private static final String INPUT_KEYRANGE_CONFIG = 
"cassandra.input.keyRange";
     private static final String INPUT_SPLIT_SIZE_CONFIG = 
"cassandra.input.split.size";
+    private static final String INPUT_WIDEROWS_CONFIG = 
"cassandra.input.widerows";
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
     private static final String RANGE_BATCH_SIZE_CONFIG = 
"cassandra.range.batch.size";
     private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
@@ -71,13 +74,13 @@ public class ConfigHelper
 
     /**
      * Set the keyspace and column family for the input of this job.
-     * Comparator and Partitioner types will be read from storage-conf.xml.
      *
      * @param conf         Job configuration you are about to run
      * @param keyspace
      * @param columnFamily
+     * @param widerows
      */
-    public static void setInputColumnFamily(Configuration conf, String 
keyspace, String columnFamily)
+    public static void setInputColumnFamily(Configuration conf, String 
keyspace, String columnFamily, boolean widerows)
     {
         if (keyspace == null)
         {
@@ -90,6 +93,19 @@ public class ConfigHelper
 
         conf.set(INPUT_KEYSPACE_CONFIG, keyspace);
         conf.set(INPUT_COLUMNFAMILY_CONFIG, columnFamily);
+        conf.set(INPUT_WIDEROWS_CONFIG, String.valueOf(widerows));
+    }
+
+    /**
+     * Set the keyspace and column family for the input of this job.
+     *
+     * @param conf         Job configuration you are about to run
+     * @param keyspace
+     * @param columnFamily
+     */
+    public static void setInputColumnFamily(Configuration conf, String 
keyspace, String columnFamily)
+    {
+        setInputColumnFamily(conf, keyspace, columnFamily, false);
     }
 
     /**
@@ -175,7 +191,7 @@ public class ConfigHelper
     {
         return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
     }
-
+    
     public static String getRawInputSlicePredicate(Configuration conf)
     {
         return conf.get(INPUT_PREDICATE_CONFIG);
@@ -299,6 +315,11 @@ public class ConfigHelper
     {
         return conf.get(INPUT_COLUMNFAMILY_CONFIG);
     }
+
+    public static boolean getInputIsWide(Configuration conf)
+    {
+        return Boolean.valueOf(conf.get(INPUT_WIDEROWS_CONFIG));
+    }
     
     public static String getOutputColumnFamily(Configuration conf)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index b84c3ce..8aacea1 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -711,6 +711,61 @@ public class CassandraServer implements Cassandra.Iface
         return thriftifyKeySlices(rows, column_parent, predicate);
     }
 
+    public List<KeySlice> get_paged_slice(String column_family, KeyRange 
range, ByteBuffer start_column, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException, 
TException
+    {
+        logger.debug("get_paged_slice");
+
+        String keyspace = state().getKeyspace();
+        state().hasColumnFamilyAccess(column_family, Permission.READ);
+
+        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, 
column_family);
+        ThriftValidation.validateKeyRange(metadata, null, range);
+        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, 
RequestType.READ);
+
+        SlicePredicate predicate = new SlicePredicate().setSlice_range(new 
SliceRange(start_column, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1));
+
+        IPartitioner p = StorageService.getPartitioner();
+        AbstractBounds<RowPosition> bounds;
+        if (range.start_key == null)
+        {
+            Token.TokenFactory tokenFactory = p.getTokenFactory();
+            Token left = tokenFactory.fromString(range.start_token);
+            Token right = tokenFactory.fromString(range.end_token);
+            bounds = Range.makeRowRange(left, right, p);
+        }
+        else
+        {
+            bounds = new 
Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), 
RowPosition.forKey(range.end_key, p));
+        }
+
+        List<Row> rows;
+        try
+        {
+            schedule(DatabaseDescriptor.getRpcTimeout());
+            try
+            {
+                rows = StorageProxy.getRangeSlice(new 
RangeSliceCommand(keyspace, column_family, null, predicate, bounds, 
range.row_filter, range.count, true), consistency_level);
+            }
+            finally
+            {
+                release();
+            }
+            assert rows != null;
+        }
+        catch (TimeoutException e)
+        {
+            logger.debug("... timed out");
+               throw new TimedOutException();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return thriftifyKeySlices(rows, new ColumnParent(column_family), 
predicate);
+    }
+
     private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent 
column_parent, SlicePredicate predicate)
     {
         List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());

Reply via email to