Repository: cassandra
Updated Branches:
  refs/heads/trunk 2c0edce09 -> dc9ed4634


Improve write path performance

Patch by tjake; reviewed by Stefania Alborghetti for CASSANDRA-12269


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc9ed463
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc9ed463
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc9ed463

Branch: refs/heads/trunk
Commit: dc9ed463417aa8028e77e91718e4f3d6ea563210
Parents: 2c0edce
Author: T Jake Luciani <j...@apache.org>
Authored: Tue Jun 21 21:53:43 2016 -0400
Committer: T Jake Luciani <j...@apache.org>
Committed: Tue Jul 26 14:55:54 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/jvm.options                                |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |  18 ++-
 src/java/org/apache/cassandra/db/Columns.java   |  11 ++
 .../cassandra/db/SerializationHeader.java       |  12 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../cassandra/db/commitlog/CommitLog.java       |  68 +++++----
 .../db/partitions/AbstractBTreePartition.java   |   3 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  49 +++---
 src/java/org/apache/cassandra/db/rows/Row.java  |  13 ++
 src/java/org/apache/cassandra/db/rows/Rows.java |  24 +--
 .../rows/UnfilteredRowIteratorSerializer.java   |   3 +
 .../cassandra/db/rows/UnfilteredSerializer.java |  75 +++++++---
 .../org/apache/cassandra/hints/HintsWriter.java |   2 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   3 +-
 .../cassandra/io/util/DataOutputBuffer.java     |  42 +++---
 .../io/util/DataOutputBufferFixed.java          |   4 +-
 .../cassandra/io/util/SafeMemoryWriter.java     |   2 +-
 .../cassandra/net/IncomingTcpConnection.java    |   3 +-
 .../org/apache/cassandra/net/MessageIn.java     |   2 +-
 .../org/apache/cassandra/net/MessageOut.java    |  24 ++-
 .../apache/cassandra/net/MessagingService.java  |   2 +
 .../cassandra/net/OutboundTcpConnection.java    |   3 +-
 .../apache/cassandra/service/ClientWarn.java    |   3 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   3 +-
 .../apache/cassandra/utils/ChecksumType.java    |  13 +-
 .../org/apache/cassandra/utils/Wrapped.java     |  48 ++++++
 .../apache/cassandra/utils/WrappedBoolean.java  |  42 ++++++
 .../cassandra/utils/WrappedException.java       |  30 ++++
 .../org/apache/cassandra/utils/WrappedInt.java  |  52 +++++++
 .../org/apache/cassandra/utils/btree/BTree.java | 131 +++++++++++++++-
 .../db/commitlog/CommitLogStressTest.java       |   4 +-
 .../test/microbench/FastThreadExecutor.java     |  96 ++++++++++++
 .../test/microbench/MutationBench.java          | 148 +++++++++++++++++++
 .../org/apache/cassandra/utils/BTreeTest.java   |  25 ++++
 35 files changed, 826 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c586d10..efbbb4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Faster write path (CASSANDRA-12269)
  * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
  * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
  * Expose metrics for successful/failed authentication attempts 
(CASSANDRA-10635)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/conf/jvm.options
----------------------------------------------------------------------
diff --git a/conf/jvm.options b/conf/jvm.options
index 692d06b..9e13e0e 100644
--- a/conf/jvm.options
+++ b/conf/jvm.options
@@ -118,6 +118,7 @@
 # resize them at runtime.
 -XX:+UseTLAB
 -XX:+ResizeTLAB
+-XX:+UseNUMA
 
 # http://www.evanjones.ca/jvm-mmap-pause.html
 -XX:+PerfDisableSharedMem

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index b175ef1c..beb9d1a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -122,6 +123,9 @@ public final class CFMetaData
 
     public final DataResource resource;
 
+    //For hot path serialization it's often easier to store this info here
+    private volatile ColumnFilter allColumnFilter;
+
     /*
      * All of these methods will go away once CFMetaData becomes completely 
immutable.
      */
@@ -294,9 +298,12 @@ public final class CFMetaData
         this.clusteringColumns = clusteringColumns;
         this.partitionColumns = partitionColumns;
 
-        this.serializers = new Serializers(this);
-        this.resource = DataResource.table(ksName, cfName);
+        //This needs to happen before serializers are set
+        //because they use comparator.subtypes()
         rebuild();
+
+        this.resource = DataResource.table(ksName, cfName);
+        this.serializers = new Serializers(this);
     }
 
     // This rebuild informations that are intrinsically duplicate of the table 
definition but
@@ -323,6 +330,8 @@ public final class CFMetaData
 
         if (isCompactTable())
             this.compactValueColumn = 
CompactTables.getCompactValueColumn(partitionColumns, isSuper());
+
+        this.allColumnFilter = ColumnFilter.all(this);
     }
 
     public Indexes getIndexes()
@@ -330,6 +339,11 @@ public final class CFMetaData
         return indexes;
     }
 
+    public ColumnFilter getAllColumnFilter()
+    {
+        return allColumnFilter;
+    }
+
     public static CFMetaData create(String ksName,
                                     String name,
                                     UUID cfId,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java 
b/src/java/org/apache/cassandra/db/Columns.java
index e3c30fa..e9e3abf 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
@@ -366,6 +367,16 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             digest.update(c.name.bytes.duplicate());
     }
 
+    /**
+     * Apply a function to each column definition in forwards or reversed 
order.
+     * @param function
+     * @param reversed
+     */
+    public void apply(Consumer<ColumnDefinition> function, boolean reversed)
+    {
+        BTree.apply(columns, function, reversed);
+    }
+
     @Override
     public boolean equals(Object other)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index af2d434..9c443c7 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -21,9 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -108,17 +105,12 @@ public class SerializationHeader
     {
         this(isForSSTable,
              metadata.getKeyValidator(),
-             typesOf(metadata.clusteringColumns()),
+             metadata.comparator.subtypes(),
              columns,
              stats,
              null);
     }
 
-    private static List<AbstractType<?>> typesOf(List<ColumnDefinition> 
columns)
-    {
-        return ImmutableList.copyOf(Lists.transform(columns, column -> 
column.type));
-    }
-
     public PartitionColumns columns()
     {
         return columns;
@@ -398,7 +390,7 @@ public class SerializationHeader
             EncodingStats stats = EncodingStats.serializer.deserialize(in);
 
             AbstractType<?> keyType = metadata.getKeyValidator();
-            List<AbstractType<?>> clusteringTypes = 
typesOf(metadata.clusteringColumns());
+            List<AbstractType<?>> clusteringTypes = 
metadata.comparator.subtypes();
 
             Columns statics, regulars;
             if (selection == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 120125f..6451c83 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -652,7 +652,7 @@ public final class SystemKeyspace
     private static Map<UUID, ByteBuffer> 
truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition 
position)
     {
         DataOutputBuffer out = null;
-        try (DataOutputBuffer ignored = out = DataOutputBuffer.RECYCLER.get())
+        try (DataOutputBuffer ignored = out = 
DataOutputBuffer.scratchBuffer.get())
         {
             CommitLogPosition.serializer.serialize(position, out);
             out.writeLong(truncatedAt);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index d76b9cb..0bb913d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CommitLogMetrics;
@@ -243,43 +244,56 @@ public class CommitLog implements CommitLogMBean
     {
         assert mutation != null;
 
-        int size = (int) Mutation.serializer.serializedSize(mutation, 
MessagingService.current_version);
-
-        int totalSize = size + ENTRY_OVERHEAD_SIZE;
-        if (totalSize > MAX_MUTATION_SIZE)
+        DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get();
+        try
         {
-            throw new IllegalArgumentException(String.format("Mutation of %s 
is too large for the maximum size of %s",
-                                                             
FBUtilities.prettyPrintMemory(totalSize),
-                                                             
FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
-        }
+            Mutation.serializer.serialize(mutation, dob, 
MessagingService.current_version);
+            int size = dob.getLength();
+
+            int totalSize = size + ENTRY_OVERHEAD_SIZE;
+            if (totalSize > MAX_MUTATION_SIZE)
+            {
+                throw new IllegalArgumentException(String.format("Mutation of 
%s is too large for the maximum size of %s",
+                                                                 
FBUtilities.prettyPrintMemory(totalSize),
+                                                                 
FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
+            }
 
-        Allocation alloc = segmentManager.allocate(mutation, totalSize);
+            Allocation alloc = segmentManager.allocate(mutation, totalSize);
 
-        CRC32 checksum = new CRC32();
-        final ByteBuffer buffer = alloc.getBuffer();
-        try (BufferedDataOutputStreamPlus dos = new 
DataOutputBufferFixed(buffer))
-        {
-            // checksummed length
-            dos.writeInt(size);
-            updateChecksumInt(checksum, size);
-            buffer.putInt((int) checksum.getValue());
-
-            // checksummed mutation
-            Mutation.serializer.serialize(mutation, dos, 
MessagingService.current_version);
-            updateChecksum(checksum, buffer, buffer.position() - size, size);
-            buffer.putInt((int) checksum.getValue());
+            CRC32 checksum = new CRC32();
+            final ByteBuffer buffer = alloc.getBuffer();
+            try (BufferedDataOutputStreamPlus dos = new 
DataOutputBufferFixed(buffer))
+            {
+                // checksummed length
+                dos.writeInt(size);
+                updateChecksumInt(checksum, size);
+                buffer.putInt((int) checksum.getValue());
+
+                // checksummed mutation
+                dos.write(dob.getData(), 0, size);
+                updateChecksum(checksum, buffer, buffer.position() - size, 
size);
+                buffer.putInt((int) checksum.getValue());
+            }
+            catch (IOException e)
+            {
+                throw new FSWriteError(e, alloc.getSegment().getPath());
+            }
+            finally
+            {
+                alloc.markWritten();
+            }
+
+            executor.finishWriteFor(alloc);
+            return alloc.getCommitLogPosition();
         }
         catch (IOException e)
         {
-            throw new FSWriteError(e, alloc.getSegment().getPath());
+            throw new FSWriteError(e, 
segmentManager.allocatingFrom().getPath());
         }
         finally
         {
-            alloc.markWritten();
+            dob.recycle();
         }
-
-        executor.finishWriteFor(alloc);
-        return alloc.getCommitLogPosition();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 0400402..9549c0d 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -169,7 +169,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
 
     public UnfilteredRowIterator unfilteredIterator()
     {
-        return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, 
false);
+        return unfilteredIterator(metadata().getAllColumnFilter(), Slices.ALL, 
false);
     }
 
     public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, 
Slices slices, boolean reversed)
@@ -197,7 +197,6 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
         ClusteringBound end = slice.end() == ClusteringBound.TOP ? null : 
slice.end();
         Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, 
start, true, end, true, desc(reversed));
         Iterator<RangeTombstone> deleteIter = 
current.deletionInfo.rangeIterator(slice, reversed);
-
         return merge(rowIter, deleteIter, selection, reversed, current, 
staticRow);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index c699634..0eccb6e 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import com.google.common.base.Function;
@@ -32,9 +33,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -168,16 +167,23 @@ public class BTreeRow extends AbstractRow
         return cd.column().isSimple() ? minDeletionTime((Cell) cd) : 
minDeletionTime((ComplexColumnData)cd);
     }
 
+    public void apply(Consumer<ColumnData> function, boolean reversed)
+    {
+        BTree.apply(btree, function, reversed);
+    }
+
+    public void apply(Consumer<ColumnData> funtion, 
com.google.common.base.Predicate<ColumnData> stopCondition, boolean reversed)
+    {
+        BTree.apply(btree, funtion, stopCondition, reversed);
+    }
+
     private static int minDeletionTime(Object[] btree, LivenessInfo info, 
DeletionTime rowDeletion)
     {
-        int min = Math.min(minDeletionTime(info), 
minDeletionTime(rowDeletion));
-        for (ColumnData cd : BTree.<ColumnData>iterable(btree))
-        {
-            min = Math.min(min, minDeletionTime(cd));
-            if (min == Integer.MIN_VALUE)
-                break;
-        }
-        return min;
+        //we have to wrap this for the lambda
+        final WrappedInt min = new WrappedInt(Math.min(minDeletionTime(info), 
minDeletionTime(rowDeletion)));
+
+        BTree.<ColumnData>apply(btree, cd -> min.set( Math.min(min.get(), 
minDeletionTime(cd)) ), cd -> min.get() == Integer.MIN_VALUE, false);
+        return min.get();
     }
 
     public Clustering clustering()
@@ -324,17 +330,26 @@ public class BTreeRow extends AbstractRow
 
     public boolean hasComplexDeletion()
     {
+        final WrappedBoolean result = new WrappedBoolean(false);
+
         // We start by the end cause we know complex columns sort before 
simple ones
-        for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
-        {
-            if (cd.column().isSimple())
-                return false;
+        apply(c -> {}, cd -> {
+            if (cd.column.isSimple())
+            {
+                result.set(false);
+                return true;
+            }
 
             if (!((ComplexColumnData) cd).complexDeletion().isLive())
+            {
+                result.set(true);
                 return true;
-        }
+            }
+
+            return false;
+        }, true);
 
-        return false;
+        return result.get();
     }
 
     public Row markCounterLocalToBeCleared()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index 53b0eb3..51cf435 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.db.rows;
 
 import java.util.*;
 import java.security.MessageDigest;
+import java.util.function.Consumer;
+
+import com.google.common.base.Predicate;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -251,6 +254,16 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
     public String toString(CFMetaData metadata, boolean fullDetails);
 
     /**
+     * Apply a function to every column in a row
+     */
+    public void apply(Consumer<ColumnData> function, boolean reverse);
+
+    /**
+     * Apply a funtion to every column in a row until a stop condition is 
reached
+     */
+    public void apply(Consumer<ColumnData> function, Predicate<ColumnData> 
stopCondition, boolean reverse);
+
+    /**
      * A row deletion/tombstone.
      * <p>
      * A row deletion mostly consists of the time of said deletion, but there 
is 2 variants: shadowable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java 
b/src/java/org/apache/cassandra/db/rows/Rows.java
index e325091..4f6c8d2 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.WrappedInt;
 
 /**
  * Static utilities to work on Row objects.
@@ -72,14 +73,15 @@ public abstract class Rows
         collector.update(row.primaryKeyLivenessInfo());
         collector.update(row.deletion().time());
 
-        int columnCount = 0;
-        int cellCount = 0;
-        for (ColumnData cd : row)
-        {
+        //we have to wrap these for the lambda
+        final WrappedInt columnCount = new WrappedInt(0);
+        final WrappedInt cellCount = new WrappedInt(0);
+
+        row.apply(cd -> {
             if (cd.column().isSimple())
             {
-                ++columnCount;
-                ++cellCount;
+                columnCount.increment();
+                cellCount.increment();
                 Cells.collectStats((Cell) cd, collector);
             }
             else
@@ -88,18 +90,18 @@ public abstract class Rows
                 collector.update(complexData.complexDeletion());
                 if (complexData.hasCells())
                 {
-                    ++columnCount;
+                    columnCount.increment();
                     for (Cell cell : complexData)
                     {
-                        ++cellCount;
+                        cellCount.increment();
                         Cells.collectStats(cell, collector);
                     }
                 }
             }
+        }, false);
 
-        }
-        collector.updateColumnSetPerRow(columnCount);
-        return cellCount;
+        collector.updateColumnSetPerRow(columnCount.get());
+        return cellCount.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 542f0a2..45c026f 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -80,12 +80,15 @@ public class UnfilteredRowIteratorSerializer
     }
 
     // Should only be used for the on-wire format.
+
     public void serialize(UnfilteredRowIterator iterator, ColumnFilter 
selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
+
         SerializationHeader header = new SerializationHeader(false,
                                                              
iterator.metadata(),
                                                              
iterator.columns(),
                                                              iterator.stats());
+
         serialize(iterator, header, selection, out, version, rowEstimate);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 5ca7e03..ed6bd12 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.WrappedException;
 
 /**
  * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@ -181,7 +182,7 @@ public class UnfilteredSerializer
 
         if (header.isForSSTable())
         {
-            DataOutputBuffer dob = DataOutputBuffer.RECYCLER.get();
+            DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get();
             try
             {
                 serializeRowBody(row, flags, header, dob);
@@ -190,7 +191,7 @@ public class UnfilteredSerializer
                 // We write the size of the previous unfiltered to make 
reverse queries more efficient (and simpler).
                 // This is currently not used however and using it is tbd.
                 out.writeUnsignedVInt(previousUnfilteredSize);
-                out.write(dob.buffer());
+                out.write(dob.getData(), 0, dob.getLength());
             }
             finally
             {
@@ -227,20 +228,37 @@ public class UnfilteredSerializer
             Columns.serializer.serializeSubset(Collections2.transform(row, 
ColumnData::column), headerColumns, out);
 
         SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
-        for (ColumnData data : row)
+
+        try
         {
-            // We can obtain the column for data directly from data.column(). 
However, if the cell/complex data
-            // originates from a sstable, the column we'll get will have the 
type used when the sstable was serialized,
-            // and if that type have been recently altered, that may not be 
the type we want to serialize the column
-            // with. So we use the ColumnDefinition from the "header" which is 
"current". Also see #11810 for what
-            // happens if we don't do that.
-            ColumnDefinition column = si.next(data.column());
-            assert column != null;
+            row.apply(cd -> {
+                // We can obtain the column for data directly from 
data.column(). However, if the cell/complex data
+                // originates from a sstable, the column we'll get will have 
the type used when the sstable was serialized,
+                // and if that type have been recently altered, that may not 
be the type we want to serialize the column
+                // with. So we use the ColumnDefinition from the "header" 
which is "current". Also see #11810 for what
+                // happens if we don't do that.
+                ColumnDefinition column = si.next(cd.column());
+                assert column != null : cd.column.toString();
+
+                try
+                {
+                    if (cd.column.isSimple())
+                        Cell.serializer.serialize((Cell) cd, column, out, 
pkLiveness, header);
+                    else
+                        writeComplexColumn((ComplexColumnData) cd, column, 
(flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
+                }
+                catch (IOException e)
+                {
+                    throw new WrappedException(e);
+                }
+            }, false);
+        }
+        catch (WrappedException e)
+        {
+            if (e.getCause() instanceof IOException)
+                throw (IOException) e.getCause();
 
-            if (data.column.isSimple())
-                Cell.serializer.serialize((Cell) data, column, out, 
pkLiveness, header);
-            else
-                writeComplexColumn((ComplexColumnData) data, column, (flags & 
HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
+            throw e;
         }
     }
 
@@ -496,12 +514,31 @@ public class UnfilteredSerializer
             builder.addRowDeletion(hasDeletion ? new 
Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : 
Row.Deletion.LIVE);
 
             Columns columns = hasAllColumns ? headerColumns : 
Columns.serializer.deserializeSubset(headerColumns, in);
-            for (ColumnDefinition column : columns)
+
+            final LivenessInfo livenessInfo = rowLiveness;
+
+            try
             {
-                if (column.isSimple())
-                    readSimpleColumn(column, in, header, helper, builder, 
rowLiveness);
-                else
-                    readComplexColumn(column, in, header, helper, 
hasComplexDeletion, builder, rowLiveness);
+                columns.apply(column -> {
+                    try
+                    {
+                        if (column.isSimple())
+                            readSimpleColumn(column, in, header, helper, 
builder, livenessInfo);
+                        else
+                            readComplexColumn(column, in, header, helper, 
hasComplexDeletion, builder, livenessInfo);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new WrappedException(e);
+                    }
+                }, false);
+            }
+            catch (WrappedException e)
+            {
+                if (e.getCause() instanceof IOException)
+                    throw (IOException) e.getCause();
+
+                throw e;
             }
 
             return builder.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java 
b/src/java/org/apache/cassandra/hints/HintsWriter.java
index ae9e05a..a081451 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -75,7 +75,7 @@ class HintsWriter implements AutoCloseable
         CRC32 crc = new CRC32();
 
         DataOutputBuffer dob = null;
-        try (DataOutputBuffer ignored = dob = DataOutputBuffer.RECYCLER.get())
+        try (DataOutputBuffer ignored = dob = 
DataOutputBuffer.scratchBuffer.get())
         {
             // write the descriptor
             descriptor.serialize(dob);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 6d3a714..fa88817 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Throwables;
 
+import com.datastax.shaded.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Row;
@@ -189,7 +190,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
     //// typedef
     static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {}
 
-    private class DiskWriter extends Thread
+    private class DiskWriter extends FastThreadLocalThread
     {
         volatile Throwable exception = null;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java 
b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index cc42c66..0d2423c 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -24,7 +24,7 @@ import java.nio.channels.WritableByteChannel;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import io.netty.util.Recycler;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.config.Config;
 
 /**
@@ -43,25 +43,22 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
     /*
      * Only recycle OutputBuffers up to 1Mb. Larger buffers will be trimmed 
back to this size.
      */
-    private static final int MAX_RECYCLE_BUFFER_SIZE = 1024 * 1024;
+    private static final int MAX_RECYCLE_BUFFER_SIZE = 
Integer.getInteger(Config.PROPERTY_PREFIX + "dob_max_recycle_bytes", 1024 * 
1024);
 
     private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
 
-    public static final Recycler<DataOutputBuffer> RECYCLER = new 
Recycler<DataOutputBuffer>()
+    /**
+     * Scratch buffers used mostly for serializing in memory. It's important 
to call #recycle() when finished
+     * to keep the memory overhead from being too large in the system.
+     */
+    public static final FastThreadLocal<DataOutputBuffer> scratchBuffer = new 
FastThreadLocal<DataOutputBuffer>()
     {
-        protected DataOutputBuffer newObject(Handle handle)
+        protected DataOutputBuffer initialValue() throws Exception
         {
-            return new DataOutputBuffer(handle);
+            return new DataOutputBuffer();
         }
     };
 
-    private final Recycler.Handle handle;
-
-    private DataOutputBuffer(Recycler.Handle handle)
-    {
-        this(DEFAULT_INITIAL_BUFFER_SIZE, handle);
-    }
-
     public DataOutputBuffer()
     {
         this(DEFAULT_INITIAL_BUFFER_SIZE);
@@ -69,28 +66,23 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
 
     public DataOutputBuffer(int size)
     {
-        this(size, null);
-    }
-
-    protected DataOutputBuffer(int size, Recycler.Handle handle)
-    {
-        this(ByteBuffer.allocate(size), handle);
+        super(ByteBuffer.allocate(size));
     }
 
-    protected DataOutputBuffer(ByteBuffer buffer, Recycler.Handle handle)
+    public DataOutputBuffer(ByteBuffer buffer)
     {
         super(buffer);
-        this.handle = handle;
     }
 
     public void recycle()
     {
-        assert handle != null;
-
-        if (buffer().capacity() <= MAX_RECYCLE_BUFFER_SIZE)
+        if (buffer.capacity() <= MAX_RECYCLE_BUFFER_SIZE)
+        {
+            buffer.clear();
+        }
+        else
         {
-            buffer.rewind();
-            RECYCLER.recycle(this, handle);
+            buffer = ByteBuffer.allocate(DEFAULT_INITIAL_BUFFER_SIZE);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java 
b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
index 5193401..8beb7a9 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -38,12 +38,12 @@ public class DataOutputBufferFixed extends DataOutputBuffer
 
     public DataOutputBufferFixed(int size)
     {
-        super(size, null);
+        super(size);
     }
 
     public DataOutputBufferFixed(ByteBuffer buffer)
     {
-        super(buffer, null);
+        super(buffer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java 
b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 88912f9..24eb93c 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -33,7 +33,7 @@ public class SafeMemoryWriter extends DataOutputBuffer
 
     private SafeMemoryWriter(SafeMemory memory)
     {
-        super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN), null);
+        super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN));
         this.memory = memory;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 9e8e2e1..02147ef 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocalThread;
 import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4FastDecompressor;
 import net.jpountz.lz4.LZ4Factory;
@@ -42,7 +43,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.NIODataInputStream;
 
-public class IncomingTcpConnection extends Thread implements Closeable
+public class IncomingTcpConnection extends FastThreadLocalThread implements 
Closeable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IncomingTcpConnection.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java 
b/src/java/org/apache/cassandra/net/MessageIn.java
index fe85595..df1b4e1 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -73,7 +73,7 @@ public class MessageIn<T>
     {
         InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 
-        MessagingService.Verb verb = 
MessagingService.Verb.values()[in.readInt()];
+        MessagingService.Verb verb = MessagingService.verbValues[in.readInt()];
         int parameterCount = in.readInt();
         Map<String, byte[]> parameters;
         if (parameterCount == 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index bc5c41b..08d858b 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
@@ -113,11 +114,26 @@ public class MessageOut<T>
             out.write(entry.getValue());
         }
 
-        long longSize = payloadSize(version);
-        assert longSize <= Integer.MAX_VALUE; // larger values are supported 
in sstables but not messages
-        out.writeInt((int) longSize);
         if (payload != null)
-            serializer.serialize(payload, out, version);
+        {
+            DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get();
+            try
+            {
+                serializer.serialize(payload, dob, version);
+
+                int size = dob.getLength();
+                out.writeInt(size);
+                out.write(dob.getData(), 0, size);
+            }
+            finally
+            {
+                dob.recycle();
+            }
+        }
+        else
+        {
+            out.writeInt(0);
+        }
     }
 
     public int serializedSize(int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 08c08f3..b1f88ee 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -153,6 +153,8 @@ public final class MessagingService implements 
MessagingServiceMBean
         ;
     }
 
+    public static final Verb[] verbValues = Verb.values();
+
     public static final EnumMap<MessagingService.Verb, Stage> verbStages = new 
EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
     {{
         put(Verb.MUTATION, Stage.MUTATION);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index c2d10fd..76b2854 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLHandshakeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocalThread;
 import net.jpountz.lz4.LZ4BlockOutputStream;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
@@ -63,7 +64,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 
-public class OutboundTcpConnection extends Thread
+public class OutboundTcpConnection extends FastThreadLocalThread
 {
     private static final Logger logger = 
LoggerFactory.getLogger(OutboundTcpConnection.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/service/ClientWarn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java 
b/src/java/org/apache/cassandra/service/ClientWarn.java
index ddad197..878b5e9 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.service;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.concurrent.ExecutorLocal;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class ClientWarn implements ExecutorLocal<ClientWarn.State>
 {
     private static final String TRUNCATED = " [truncated]";
-    private static final ThreadLocal<ClientWarn.State> warnLocal = new 
ThreadLocal<>();
+    private static final FastThreadLocal<State> warnLocal = new 
FastThreadLocal<>();
     public static ClientWarn instance = new ClientWarn();
 
     private ClientWarn()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java 
b/src/java/org/apache/cassandra/tracing/Tracing.java
index e69645f..adf5ed9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.concurrent.ExecutorLocal;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
@@ -84,7 +85,7 @@ public abstract class Tracing implements 
ExecutorLocal<TraceState>
 
     private final InetAddress localAddress = FBUtilities.getLocalAddress();
 
-    private final ThreadLocal<TraceState> state = new ThreadLocal<>();
+    private final FastThreadLocal<TraceState> state = new FastThreadLocal<>();
 
     protected final ConcurrentMap<UUID, TraceState> sessions = new 
ConcurrentHashMap<>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/ChecksumType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java 
b/src/java/org/apache/cassandra/utils/ChecksumType.java
index 3fa245b..413a171 100644
--- a/src/java/org/apache/cassandra/utils/ChecksumType.java
+++ b/src/java/org/apache/cassandra/utils/ChecksumType.java
@@ -22,6 +22,8 @@ import java.util.zip.Checksum;
 import java.util.zip.CRC32;
 import java.util.zip.Adler32;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 public enum ChecksumType
 {
     Adler32
@@ -60,12 +62,13 @@ public enum ChecksumType
     public abstract Checksum newInstance();
     public abstract void update(Checksum checksum, ByteBuffer buf);
 
-    private ThreadLocal<Checksum> instances = 
ThreadLocal.withInitial(this::newInstance);
-
-    public Checksum threadLocalInstance()
+    private FastThreadLocal<Checksum> instances = new 
FastThreadLocal<Checksum>()
     {
-        return instances.get();
-    }
+        protected Checksum initialValue() throws Exception
+        {
+            return newInstance();
+        }
+    };
 
     public long of(ByteBuffer buf)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/Wrapped.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Wrapped.java 
b/src/java/org/apache/cassandra/utils/Wrapped.java
new file mode 100644
index 0000000..1996a86
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Wrapped.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Simple wrapper class to be used when a lambda function
+ * needs to modify a variable outside it's scope.
+ */
+public class Wrapped<T>
+{
+    private T value;
+
+    public static <V> Wrapped<V> create(V initial)
+    {
+        return new Wrapped<>(initial);
+    }
+
+    private Wrapped(T initial)
+    {
+        this.value = initial;
+    }
+
+    public T get()
+    {
+        return value;
+    }
+
+    public void set(T value)
+    {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/WrappedBoolean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/WrappedBoolean.java 
b/src/java/org/apache/cassandra/utils/WrappedBoolean.java
new file mode 100644
index 0000000..4b1443e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/WrappedBoolean.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Simple wrapper for native boolean type
+ */
+public class WrappedBoolean
+{
+    private boolean value;
+
+    public WrappedBoolean(boolean initial)
+    {
+        this.value = initial;
+    }
+
+    public boolean get()
+    {
+        return value;
+    }
+
+    public void set(boolean value)
+    {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/WrappedException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/WrappedException.java 
b/src/java/org/apache/cassandra/utils/WrappedException.java
new file mode 100644
index 0000000..3cf56bc
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/WrappedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Wrapped runtime exception for lambda functions
+ */
+public class WrappedException extends RuntimeException
+{
+    public WrappedException(Exception cause)
+    {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/WrappedInt.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java 
b/src/java/org/apache/cassandra/utils/WrappedInt.java
new file mode 100644
index 0000000..a106575
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/WrappedInt.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Simple wrapper for native int type
+ */
+public class WrappedInt
+{
+    private int value;
+
+    public WrappedInt(int initial)
+    {
+        this.value = initial;
+    }
+
+    public int get()
+    {
+        return value;
+    }
+
+    public void set(int value)
+    {
+        this.value = value;
+    }
+
+    public void increment()
+    {
+        ++value;
+    }
+
+    public void decrement()
+    {
+        --value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java 
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 33f4152..5a6fffe 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -19,8 +19,10 @@
 package org.apache.cassandra.utils.btree;
 
 import java.util.*;
+import java.util.function.Consumer;
 
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 
@@ -136,7 +138,8 @@ public class BTree
                 for (K k : source)
                     values[i++] = updateF.apply(k);
             }
-            updateF.allocated(ObjectSizes.sizeOfArray(values));
+            if (updateF != UpdateFunction.noOp())
+                updateF.allocated(ObjectSizes.sizeOfArray(values));
             return values;
         }
 
@@ -188,7 +191,7 @@ public class BTree
             tree1 = tree2;
             tree2 = tmp;
         }
-        return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), 
updateF);
+        return update(tree1, comparator, new BTreeSet<>(tree2, comparator), 
updateF);
     }
 
     public static <V> Iterator<V> iterator(Object[] btree)
@@ -1159,4 +1162,128 @@ public class BTree
         }
         return compare(cmp, previous, max) < 0;
     }
+
+    /**
+     * Simple method to walk the btree forwards or reversed and apply a 
function to each element
+     *
+     * Public method
+     *
+     */
+    public static <V> void apply(Object[] btree, Consumer<V> function, boolean 
reversed)
+    {
+        if (reversed)
+            applyReverse(btree, function, null);
+        else
+            applyForwards(btree, function, null);
+    }
+
+    /**
+     * Simple method to walk the btree forwards or reversed and apply a 
function till a stop condition is reached
+     *
+     * Public method
+     *
+     */
+    public static <V> void apply(Object[] btree, Consumer<V> function, 
Predicate<V> stopCondition, boolean reversed)
+    {
+        if (reversed)
+            applyReverse(btree, function, stopCondition);
+        else
+            applyForwards(btree, function, stopCondition);
+    }
+
+
+
+
+    /**
+     * Simple method to walk the btree forwards and apply a function till a 
stop condition is reached
+     *
+     * Private method
+     *
+     * @param btree
+     * @param function
+     * @param stopCondition
+     */
+    private static <V> boolean applyForwards(Object[] btree, Consumer<V> 
function, Predicate<V> stopCondition)
+    {
+        boolean isLeaf = isLeaf(btree);
+        int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree);
+        int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1;
+        for (int i = 0 ; i < limit ; i++)
+        {
+            // we want to visit in iteration order, so we visit our key nodes 
inbetween our children
+            int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0);
+            Object current = btree[idx];
+            if (idx < childOffset)
+            {
+                V castedCurrent = (V) current;
+                if (stopCondition != null && 
stopCondition.apply(castedCurrent))
+                    return true;
+
+                function.accept(castedCurrent);
+            }
+            else
+            {
+                if (applyForwards((Object[]) current, function, stopCondition))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Simple method to walk the btree in reverse and apply a function till a 
stop condition is reached
+     *
+     * Private method
+     *
+     * @param btree
+     * @param function
+     * @param stopCondition
+     */
+    private static <V> boolean applyReverse(Object[] btree, Consumer<V> 
function, Predicate<V> stopCondition)
+    {
+        boolean isLeaf = isLeaf(btree);
+        int childOffset = isLeaf ? 0 : getChildStart(btree);
+        int limit = isLeaf ? getLeafKeyEnd(btree)  : btree.length - 1;
+        for (int i = limit - 1, visited = 0; i >= 0 ; i--, visited++)
+        {
+            int idx = i;
+
+            // we want to visit in reverse iteration order, so we visit our 
children nodes inbetween our keys
+            if (!isLeaf)
+            {
+                int typeOffset = visited / 2;
+
+                if (i % 2 == 0)
+                {
+                    // This is a child branch. Since children are in the 
second half of the array, we must
+                    // adjust for the key's we've visited along the way
+                    idx += typeOffset;
+                }
+                else
+                {
+                    // This is a key. Since the keys are in the first half of 
the array and we are iterating
+                    // in reverse we subtract the childOffset and adjust for 
children we've walked so far
+                    idx = i - childOffset + typeOffset;
+                }
+            }
+
+            Object current = btree[idx];
+            if (isLeaf || idx < childOffset)
+            {
+                V castedCurrent = (V) current;
+                if (stopCondition != null && 
stopCondition.apply(castedCurrent))
+                    return true;
+
+                function.accept(castedCurrent);
+            }
+            else
+            {
+                if (applyReverse((Object[]) current, function, stopCondition))
+                    return true;
+            }
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java 
b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index b86a15b..babd571 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -35,6 +35,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
@@ -408,7 +409,8 @@ public class CommitLogStressTest
         return slice;
     }
 
-    public class CommitlogThread extends Thread {
+    public class CommitlogThread extends FastThreadLocalThread
+    {
         final AtomicLong counter = new AtomicLong();
         int hash = 0;
         int cells = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java 
b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
new file mode 100644
index 0000000..d0b4442
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.util.concurrent.FastThreadLocalThread;
+
+/**
+ * Created to test perf of FastThreadLocal
+ *
+ * Used in MutationBench via:
+ * jvmArgsAppend = {"-Djmh.executor=CUSTOM", 
"-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}
+ */
+public class FastThreadExecutor extends AbstractExecutorService
+{
+    final FastThreadLocalThread thread;
+    final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    final CountDownLatch shutdown = new CountDownLatch(1);
+
+    public FastThreadExecutor(int size, String name)
+    {
+        assert size == 1;
+
+        thread = new FastThreadLocalThread(() -> {
+            Runnable work = null;
+            try
+            {
+                while ((work = queue.take()) != null)
+                    work.run();
+            }
+            catch (InterruptedException e)
+            {
+                shutdown.countDown();
+            }
+        });
+
+        thread.setName(name + "-1");
+        thread.setDaemon(true);
+
+        thread.start();
+    }
+
+
+    public void shutdown()
+    {
+        thread.interrupt();
+    }
+
+    public List<Runnable> shutdownNow()
+    {
+        thread.interrupt();
+        return Collections.emptyList();
+    }
+
+    public boolean isShutdown()
+    {
+        return shutdown.getCount() == 0;
+    }
+
+    public boolean isTerminated()
+    {
+        return shutdown.getCount() == 0;
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        return shutdown.await(timeout, unit);
+    }
+
+    public void execute(Runnable command)
+    {
+        while(!queue.add(command));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
new file mode 100644
index 0000000..89dcb0f
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.profile.StackProfiler;
+import org.openjdk.jmh.results.Result;
+import org.openjdk.jmh.results.RunResult;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1
+       , jvmArgsAppend = {"-Djmh.executor=CUSTOM", 
"-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"
+       //,"-XX:+UnlockCommercialFeatures", 
"-XX:+FlightRecorder","-XX:+UnlockDiagnosticVMOptions", 
"-XX:+DebugNonSafepoints",
+       // 
"-XX:StartFlightRecording=duration=60s,filename=./profiling-data.jfr,name=profile,settings=profile",
+       // 
"-XX:FlightRecorderOptions=settings=/home/jake/workspace/cassandra/profiling-advanced.jfc,samplethreads=true"
+     }
+)
+@Threads(1)
+@State(Scope.Benchmark)
+public class MutationBench
+{
+    static
+    {
+        Config.setClientMode(true);
+        // Partitioner is not set in client mode.
+        if (DatabaseDescriptor.getPartitioner() == null)
+            
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+
+    static String keyspace = "keyspace1";
+
+    private Mutation mutation;
+    private MessageOut<Mutation> messageOut;
+
+    private ByteBuffer buffer;
+    private DataOutputBuffer outputBuffer;
+    private DataInputBuffer inputBuffer;
+
+
+    @State(Scope.Thread)
+    public static class ThreadState
+    {
+        MessageIn<Mutation> in;
+        int counter = 0;
+    }
+
+    @Setup
+    public void setup() throws IOException
+    {
+        Schema.instance.load(KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1)));
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+        CFMetaData metadata = CFMetaData.compile("CREATE TABLE userpics " +
+                                                   "( userid bigint," +
+                                                   "picid bigint," +
+                                                   "commentid bigint, " +
+                                                   "PRIMARY KEY(userid, 
picid))", keyspace);
+
+        Schema.instance.load(metadata);
+        
Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(metadata)));
+
+
+        mutation = (Mutation)UpdateBuilder.create(metadata, 
1L).newRow(1L).add("commentid", 32L).makeMutation();
+        messageOut = mutation.createMessage();
+        buffer = 
ByteBuffer.allocate(messageOut.serializedSize(MessagingService.current_version));
+        outputBuffer = new DataOutputBufferFixed(buffer);
+        inputBuffer = new DataInputBuffer(buffer, false);
+
+        messageOut.serialize(outputBuffer, MessagingService.current_version);
+    }
+
+    @Benchmark
+    public void serialize(ThreadState state) throws IOException
+    {
+        buffer.rewind();
+        messageOut.serialize(outputBuffer, MessagingService.current_version);
+        state.counter++;
+    }
+
+    @Benchmark
+    public void deserialize(ThreadState state) throws IOException
+    {
+        buffer.rewind();
+        state.in = MessageIn.read(inputBuffer, 
MessagingService.current_version, 0);
+        state.counter++;
+    }
+
+    public static void main(String... args) throws Exception {
+        Options opts = new OptionsBuilder()
+                       .include(".*"+MutationBench.class.getSimpleName()+".*")
+                       .jvmArgs("-server")
+                       .forks(1)
+                       .mode(Mode.Throughput)
+                       .addProfiler(StackProfiler.class)
+                       .build();
+
+        Collection<RunResult> records = new Runner(opts).run();
+        for ( RunResult result : records) {
+            Result r = result.getPrimaryResult();
+            System.out.println("API replied benchmark score: "
+                               + r.getScore() + " "
+                               + r.getScoreUnit() + " over "
+                               + r.getStatistics().getN() + " iterations");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/unit/org/apache/cassandra/utils/BTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java 
b/test/unit/org/apache/cassandra/utils/BTreeTest.java
index a01ad2e..ec4cdb8 100644
--- a/test/unit/org/apache/cassandra/utils/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -128,6 +129,30 @@ public class BTreeTest
             checkResult(i, BTree.update(BTree.build(seq(i), noOp), CMP, 
seq(i), updateF));
     }
 
+    @Test
+    public void testApplyForwards()
+    {
+        List<Integer> input = seq(71);
+        Object[] btree = BTree.build(input, noOp);
+
+        final List<Integer> result = new ArrayList<>();
+        BTree.<Integer>apply(btree, i -> result.add(i), false);
+
+        org.junit.Assert.assertArrayEquals(input.toArray(),result.toArray());
+    }
+
+    @Test
+    public void testApplyReverse()
+    {
+        List<Integer> input = seq(71);
+        Object[] btree = BTree.build(input, noOp);
+
+        final List<Integer> result = new ArrayList<>();
+        BTree.<Integer>apply(btree, i -> result.add(i), true);
+
+        
org.junit.Assert.assertArrayEquals(Lists.reverse(input).toArray(),result.toArray());
+    }
+
     /**
      * Tests that the apply method of the <code>UpdateFunction</code> is only 
called once with each key update.
      * (see CASSANDRA-8018).

Reply via email to