http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
 
b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
new file mode 100644
index 0000000..d79ab06
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.*;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Abstract common class for all non-thread safe Partition implementations.
+ */
+public abstract class AbstractThreadUnsafePartition implements Partition, 
Iterable<Row>
+{
+    protected final CFMetaData metadata;
+    protected final DecoratedKey key;
+
+    protected final PartitionColumns columns;
+
+    protected final List<Row> rows;
+
+    protected AbstractThreadUnsafePartition(CFMetaData metadata,
+                                            DecoratedKey key,
+                                            PartitionColumns columns,
+                                            List<Row> rows)
+    {
+        this.metadata = metadata;
+        this.key = key;
+        this.columns = columns;
+        this.rows = rows;
+    }
+
+    public CFMetaData metadata()
+    {
+        return metadata;
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return key;
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return deletionInfo().getPartitionDeletion();
+    }
+
+    public PartitionColumns columns()
+    {
+        return columns;
+    }
+
+    public abstract Row staticRow();
+
+    protected abstract boolean canHaveShadowedData();
+
+    /**
+     * The deletion info for the partition update.
+     *
+     * Note: do not cast the result to a {@code MutableDeletionInfo} to modify 
it!
+     *
+     * @return the deletion info for the partition update for use as read-only.
+     */
+    public abstract DeletionInfo deletionInfo();
+
+    public int rowCount()
+    {
+        return rows.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return deletionInfo().isLive() && rows.isEmpty() && 
staticRow().isEmpty();
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        CFMetaData metadata = metadata();
+        sb.append(String.format("Partition[%s.%s] key=%s columns=%s%s",
+                                 metadata().ksName,
+                                 metadata().cfName,
+                                 
metadata().getKeyValidator().getString(partitionKey().getKey()),
+                                 columns(),
+                                 deletionInfo().isLive() ? "" : " " + 
deletionInfo()));
+
+        if (staticRow() != Rows.EMPTY_STATIC_ROW)
+            sb.append("\n    ").append(staticRow().toString(metadata, true));
+
+        // We use createRowIterator() directly instead of iterator() because 
that avoids
+        // sorting for PartitionUpdate (which inherit this method) and that is 
useful because
+        //  1) it can help with debugging and 2) we can't write after sorting 
but we want to
+        // be able to print an update while we build it (again for debugging)
+        for (Row row : this)
+            sb.append("\n    ").append(row.toString(metadata, true));
+
+        return sb.toString();
+    }
+
+    public Row getRow(Clustering clustering)
+    {
+        Row row = searchIterator(ColumnFilter.selection(columns()), 
false).next(clustering);
+        // Note that for statics, this will never return null, this will 
return an empty row. However,
+        // it's more consistent for this method to return null if we don't 
really have a static row.
+        return row == null || (clustering == Clustering.STATIC_CLUSTERING && 
row.isEmpty()) ? null : row;
+    }
+
+    /**
+     * Returns an iterator that iterators over the rows of this update in 
clustering order.
+     *
+     * @return an iterator over the rows of this partition.
+     */
+    public Iterator<Row> iterator()
+    {
+        return rows.iterator();
+    }
+
+    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter 
columns, boolean reversed)
+    {
+        final RowSearcher searcher = reversed ? new ReverseRowSearcher() : new 
ForwardRowSearcher();
+        return new SearchIterator<Clustering, Row>()
+        {
+            public boolean hasNext()
+            {
+                return !searcher.isDone();
+            }
+
+            public Row next(Clustering clustering)
+            {
+                if (clustering == Clustering.STATIC_CLUSTERING)
+                {
+                    Row staticRow = staticRow();
+                    return staticRow.isEmpty() || 
columns.fetchedColumns().statics.isEmpty()
+                         ? Rows.EMPTY_STATIC_ROW
+                         : staticRow.filter(columns, partitionLevelDeletion(), 
true, metadata);
+                }
+
+                Row row = searcher.search(clustering);
+                RangeTombstone rt = deletionInfo().rangeCovering(clustering);
+
+                // A search iterator only return a row, so it doesn't allow to 
directly account for deletion that should apply to to row
+                // (the partition deletion or the deletion of a range 
tombstone that covers it). So if needs be, reuse the row deletion
+                // to carry the proper deletion on the row.
+                DeletionTime activeDeletion = partitionLevelDeletion();
+                if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+                    activeDeletion = rt.deletionTime();
+
+                if (row == null)
+                    return activeDeletion.isLive() ? null : 
ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+
+                return row.filter(columns, activeDeletion, true, metadata);
+            }
+        };
+    }
+
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, 
false);
+    }
+
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, 
Slices slices, boolean reversed)
+    {
+        return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, 
reversed));
+    }
+
+    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
+    {
+        return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), 
false);
+    }
+
+    protected SliceableUnfilteredRowIterator 
sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed)
+    {
+        return new SliceableIterator(this, selection, reversed);
+    }
+
+    /**
+     * Simple binary search for a given row (in the rows list).
+     *
+     * The return value has the exact same meaning that the one of 
Collections.binarySearch() but
+     * we don't use the later because we're searching for a 'Clustering' in an 
array of 'Row' (and while
+     * both are Clusterable, it's slightly faster to use the 'Clustering' 
comparison (see comment on
+     * ClusteringComparator.rowComparator())).
+     */
+    private int binarySearch(Clustering clustering, int fromIndex, int toIndex)
+    {
+        ClusteringComparator comparator = metadata().comparator;
+        int low = fromIndex;
+        int mid = toIndex;
+        int high = mid - 1;
+        int result = -1;
+        while (low <= high)
+        {
+            mid = (low + high) >> 1;
+            if ((result = comparator.compare(clustering, 
rows.get(mid).clustering())) > 0)
+                low = mid + 1;
+            else if (result == 0)
+                return mid;
+            else
+                high = mid - 1;
+        }
+        return -mid - (result < 0 ? 1 : 2);
+    }
+
+    private class SliceableIterator extends AbstractUnfilteredRowIterator 
implements SliceableUnfilteredRowIterator
+    {
+        private final ColumnFilter columns;
+        private RowSearcher searcher;
+
+        private Iterator<Unfiltered> iterator;
+
+        private SliceableIterator(AbstractThreadUnsafePartition partition, 
ColumnFilter columns, boolean isReverseOrder)
+        {
+            super(partition.metadata(),
+                  partition.partitionKey(),
+                  partition.partitionLevelDeletion(),
+                  columns.fetchedColumns(),
+                  partition.staticRow().isEmpty() ? Rows.EMPTY_STATIC_ROW : 
partition.staticRow().filter(columns, partition.partitionLevelDeletion(), 
false, partition.metadata()),
+                  isReverseOrder,
+                  partition.stats());
+            this.columns = columns;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            if (iterator == null)
+                iterator = merge(isReverseOrder ? 
Lists.reverse(rows).iterator(): iterator(), 
deletionInfo().rangeIterator(isReverseOrder()));
+
+            return iterator.hasNext() ? iterator.next() : endOfData();
+        }
+
+        public Iterator<Unfiltered> slice(Slice slice)
+        {
+            if (searcher == null)
+                searcher = isReverseOrder() ? new ReverseRowSearcher() : new 
ForwardRowSearcher();
+            return merge(searcher.slice(slice), 
deletionInfo().rangeIterator(slice, isReverseOrder()));
+        }
+
+        private Iterator<Unfiltered> merge(Iterator<Row> rows, 
Iterator<RangeTombstone> ranges)
+        {
+            return new RowAndDeletionMergeIterator(metadata,
+                                                   partitionKey,
+                                                   partitionLevelDeletion,
+                                                   columns,
+                                                   staticRow(),
+                                                   isReverseOrder(),
+                                                   stats(),
+                                                   rows,
+                                                   ranges,
+                                                   canHaveShadowedData());
+        }
+    }
+
+    /**
+     * Utility class to search for rows or slice of rows in order.
+     */
+    private abstract class RowSearcher
+    {
+        public abstract boolean isDone();
+
+        public abstract Row search(Clustering name);
+
+        public abstract Iterator<Row> slice(Slice slice);
+
+        protected int search(Clustering clustering, int from, int to)
+        {
+            return binarySearch(clustering, from, to);
+        }
+
+        protected int search(Slice.Bound bound, int from, int to)
+        {
+            return Collections.binarySearch(rows.subList(from, to), bound, 
metadata.comparator);
+        }
+    }
+
+    private class ForwardRowSearcher extends RowSearcher
+    {
+        private int nextIdx = 0;
+
+        public boolean isDone()
+        {
+            return nextIdx >= rows.size();
+        }
+
+        public Row search(Clustering name)
+        {
+            if (isDone())
+                return null;
+
+            int idx = search(name, nextIdx, rows.size());
+            if (idx < 0)
+            {
+                nextIdx = -idx - 1;
+                return null;
+            }
+            else
+            {
+                nextIdx = idx + 1;
+                return rows.get(idx);
+            }
+        }
+
+        public Iterator<Row> slice(Slice slice)
+        {
+            // Note that because a Slice.Bound can never sort equally to a 
Clustering, we know none of the search will
+            // be a match, so we save from testing for it.
+
+            final int start = -search(slice.start(), nextIdx, rows.size()) - 
1; // First index to include
+            if (start >= rows.size())
+                return Collections.emptyIterator();
+
+            final int end = -search(slice.end(), start, rows.size()) - 1; // 
First index to exclude
+
+            // Remember the end to speed up potential further slice search
+            nextIdx = end;
+
+            if (start >= end)
+                return Collections.emptyIterator();
+
+            return rows.subList(start, end).iterator();
+        }
+    }
+
+    private class ReverseRowSearcher extends RowSearcher
+    {
+        private int nextIdx = rows.size() - 1;
+
+        public boolean isDone()
+        {
+            return nextIdx < 0;
+        }
+
+        public Row search(Clustering name)
+        {
+            if (isDone())
+                return null;
+
+            int idx = search(name, 0, nextIdx);
+            if (idx < 0)
+            {
+                // The insertion point is the first element greater than name, 
so we want start from the previous one next time
+                nextIdx = -idx - 2;
+                return null;
+            }
+            else
+            {
+                nextIdx = idx - 1;
+                return rows.get(idx);
+            }
+        }
+
+        public Iterator<Row> slice(Slice slice)
+        {
+            // Note that because a Slice.Bound can never sort equally to a 
Clustering, we know none of the search will
+            // be a match, so we save from testing for it.
+
+            // The insertion point is the first element greater than 
slice.end(), so we want the previous index
+            final int start = -search(slice.end(), 0, nextIdx + 1) - 2;  // 
First index to include
+            if (start < 0)
+                return Collections.emptyIterator();
+
+            final int end = -search(slice.start(), 0, start + 1) - 2; // First 
index to exclude
+
+            // Remember the end to speed up potential further slice search
+            nextIdx = end;
+
+            if (start < end)
+                return Collections.emptyIterator();
+
+            return Lists.reverse(rows.subList(end+1, start+1)).iterator();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
 
b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..f7d7222
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * A partition iterator that allows to filter/modify the unfiltered from the
+ * underlying iterators.
+ */
+public abstract class AlteringUnfilteredPartitionIterator extends 
WrappingUnfilteredPartitionIterator
+{
+    protected AlteringUnfilteredPartitionIterator(UnfilteredPartitionIterator 
wrapped)
+    {
+        super(wrapped);
+    }
+
+    protected Row computeNextStatic(DecoratedKey partitionKey, Row row)
+    {
+        return row;
+    }
+
+    protected Row computeNext(DecoratedKey partitionKey, Row row)
+    {
+        return row;
+    }
+
+    protected RangeTombstoneMarker computeNext(DecoratedKey partitionKey, 
RangeTombstoneMarker marker)
+    {
+        return marker;
+    }
+
+    @Override
+    protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+    {
+        final DecoratedKey partitionKey = iter.partitionKey();
+        return new AlteringUnfilteredRowIterator(iter)
+        {
+            protected Row computeNextStatic(Row row)
+            {
+                return 
AlteringUnfilteredPartitionIterator.this.computeNextStatic(partitionKey, row);
+            }
+
+            protected Row computeNext(Row row)
+            {
+                return 
AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, row);
+            }
+
+            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker 
marker)
+            {
+                return 
AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, marker);
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java 
b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index bec8056..f39245b 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -18,11 +18,11 @@
 package org.apache.cassandra.db.partitions;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -33,24 +33,31 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
 {
     private final int createdAtInSec;
 
-    // Note that those fields are really immutable, but we can't easily pass 
their values to
-    // the ctor so they are not final.
-    private int cachedLiveRows;
-    private int rowsWithNonExpiringCells;
+    private final int cachedLiveRows;
+    private final int rowsWithNonExpiringCells;
 
-    private int nonTombstoneCellCount;
-    private int nonExpiringLiveCells;
+    private final int nonTombstoneCellCount;
+    private final int nonExpiringLiveCells;
 
     private ArrayBackedCachedPartition(CFMetaData metadata,
                                        DecoratedKey partitionKey,
-                                       DeletionTime deletionTime,
                                        PartitionColumns columns,
-                                       int initialRowCapacity,
-                                       boolean sortable,
-                                       int createdAtInSec)
+                                       Row staticRow,
+                                       List<Row> rows,
+                                       DeletionInfo deletionInfo,
+                                       RowStats stats,
+                                       int createdAtInSec,
+                                       int cachedLiveRows,
+                                       int rowsWithNonExpiringCells,
+                                       int nonTombstoneCellCount,
+                                       int nonExpiringLiveCells)
     {
-        super(metadata, partitionKey, deletionTime, columns, 
initialRowCapacity, sortable);
+        super(metadata, partitionKey, columns, staticRow, rows, deletionInfo, 
stats);
         this.createdAtInSec = createdAtInSec;
+        this.cachedLiveRows = cachedLiveRows;
+        this.rowsWithNonExpiringCells = rowsWithNonExpiringCells;
+        this.nonTombstoneCellCount = nonTombstoneCellCount;
+        this.nonExpiringLiveCells = nonExpiringLiveCells;
     }
 
     /**
@@ -65,7 +72,7 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
      */
     public static ArrayBackedCachedPartition create(UnfilteredRowIterator 
iterator, int nowInSec)
     {
-        return create(iterator, 4, nowInSec);
+        return create(iterator, 16, nowInSec);
     }
 
     /**
@@ -82,30 +89,76 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
      */
     public static ArrayBackedCachedPartition create(UnfilteredRowIterator 
iterator, int initialRowCapacity, int nowInSec)
     {
-        ArrayBackedCachedPartition partition = new 
ArrayBackedCachedPartition(iterator.metadata(),
-                                                                              
iterator.partitionKey(),
-                                                                              
iterator.partitionLevelDeletion(),
-                                                                              
iterator.columns(),
-                                                                              
initialRowCapacity,
-                                                                              
iterator.isReverseOrder(),
-                                                                              
nowInSec);
+        CFMetaData metadata = iterator.metadata();
+        boolean reversed = iterator.isReverseOrder();
 
-        partition.staticRow = iterator.staticRow().takeAlias();
+        List<Row> rows = new ArrayList<>(initialRowCapacity);
+        MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), 
metadata.comparator, reversed);
 
-        Writer writer = partition.new Writer(nowInSec);
-        RangeTombstoneCollector markerCollector = partition.new 
RangeTombstoneCollector(iterator.isReverseOrder());
+        int cachedLiveRows = 0;
+        int rowsWithNonExpiringCells = 0;
 
-        copyAll(iterator, writer, markerCollector, partition);
+        int nonTombstoneCellCount = 0;
+        int nonExpiringLiveCells = 0;
 
-        return partition;
+        while (iterator.hasNext())
+        {
+            Unfiltered unfiltered = iterator.next();
+            if (unfiltered.kind() == Unfiltered.Kind.ROW)
+            {
+                Row row = (Row)unfiltered;
+                rows.add(row);
+
+                // Collect stats
+                if (row.hasLiveData(nowInSec))
+                    ++cachedLiveRows;
+
+                boolean hasNonExpiringCell = false;
+                for (Cell cell : row.cells())
+                {
+                    if (!cell.isTombstone())
+                    {
+                        ++nonTombstoneCellCount;
+                        if (!cell.isExpiring())
+                        {
+                            hasNonExpiringCell = true;
+                            ++nonExpiringLiveCells;
+                        }
+                    }
+                }
+
+                if (hasNonExpiringCell)
+                    ++rowsWithNonExpiringCells;
+            }
+            else
+            {
+                deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+            }
+        }
+
+        if (reversed)
+            Collections.reverse(rows);
+
+        return new ArrayBackedCachedPartition(metadata,
+                                              iterator.partitionKey(),
+                                              iterator.columns(),
+                                              iterator.staticRow(),
+                                              rows,
+                                              deletionBuilder.build(),
+                                              iterator.stats(),
+                                              nowInSec,
+                                              cachedLiveRows,
+                                              rowsWithNonExpiringCells,
+                                              nonTombstoneCellCount,
+                                              nonExpiringLiveCells);
     }
 
     public Row lastRow()
     {
-        if (rows == 0)
+        if (rows.isEmpty())
             return null;
 
-        return new InternalReusableRow().setTo(rows - 1);
+        return rows.get(rows.size() - 1);
     }
 
     /**
@@ -146,62 +199,6 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
         return nonExpiringLiveCells;
     }
 
-    // Writers that collect the values for 'cachedLiveRows', 
'rowsWithNonExpiringCells', 'nonTombstoneCellCount'
-    // and 'nonExpiringLiveCells'.
-    protected class Writer extends AbstractPartitionData.Writer
-    {
-        private final int nowInSec;
-
-        private boolean hasLiveData;
-        private boolean hasNonExpiringCell;
-
-        protected Writer(int nowInSec)
-        {
-            super(true);
-            this.nowInSec = nowInSec;
-        }
-
-        @Override
-        public void writePartitionKeyLivenessInfo(LivenessInfo info)
-        {
-            super.writePartitionKeyLivenessInfo(info);
-            if (info.isLive(nowInSec))
-                hasLiveData = true;
-        }
-
-        @Override
-        public void writeCell(ColumnDefinition column, boolean isCounter, 
ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            super.writeCell(column, isCounter, value, info, path);
-
-            if (info.isLive(nowInSec))
-            {
-                hasLiveData = true;
-                if (!info.hasTTL())
-                {
-                    hasNonExpiringCell = true;
-                    ++ArrayBackedCachedPartition.this.nonExpiringLiveCells;
-                }
-            }
-
-            if (!info.hasLocalDeletionTime() || info.hasTTL())
-                ++ArrayBackedCachedPartition.this.nonTombstoneCellCount;
-        }
-
-        @Override
-        public void endOfRow()
-        {
-            super.endOfRow();
-            if (hasLiveData)
-                ++ArrayBackedCachedPartition.this.cachedLiveRows;
-            if (hasNonExpiringCell)
-                ++ArrayBackedCachedPartition.this.rowsWithNonExpiringCells;
-
-            hasLiveData = false;
-            hasNonExpiringCell = false;
-        }
-    }
-
     static class Serializer implements ISerializer<CachedPartition>
     {
         public void serialize(CachedPartition partition, DataOutputPlus out) 
throws IOException
@@ -210,9 +207,13 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
             ArrayBackedCachedPartition p = 
(ArrayBackedCachedPartition)partition;
 
             out.writeInt(p.createdAtInSec);
+            out.writeInt(p.cachedLiveRows);
+            out.writeInt(p.rowsWithNonExpiringCells);
+            out.writeInt(p.nonTombstoneCellCount);
+            out.writeInt(p.nonExpiringLiveCells);
             try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
             {
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
out, MessagingService.current_version, p.rows);
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
out, MessagingService.current_version, p.rowCount());
             }
         }
 
@@ -226,18 +227,42 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
             //      is slightly faster.
 
             int createdAtInSec = in.readInt();
+            int cachedLiveRows = in.readInt();
+            int rowsWithNonExpiringCells = in.readInt();
+            int nonTombstoneCellCount = in.readInt();
+            int nonExpiringLiveCells = in.readInt();
+
+            UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, 
MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+            assert !header.isReversed && header.rowEstimate >= 0;
 
-            UnfilteredRowIteratorSerializer.Header h = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, 
MessagingService.current_version, SerializationHelper.Flag.LOCAL);
-            assert !h.isReversed && h.rowEstimate >= 0;
+            MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(header.partitionDeletion, 
header.metadata.comparator, false);
+            List<Row> rows = new ArrayList<>(header.rowEstimate);
 
-            ArrayBackedCachedPartition partition = new 
ArrayBackedCachedPartition(h.metadata, h.key, h.partitionDeletion, 
h.sHeader.columns(), h.rowEstimate, false, createdAtInSec);
-            partition.staticRow = h.staticRow;
+            try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, 
MessagingService.current_version, SerializationHelper.Flag.LOCAL, header))
+            {
+                while (partition.hasNext())
+                {
+                    Unfiltered unfiltered = partition.next();
+                    if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                        rows.add((Row)unfiltered);
+                    else
+                        deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+                }
+            }
 
-            Writer writer = partition.new Writer(createdAtInSec);
-            RangeTombstoneMarker.Writer markerWriter = partition.new 
RangeTombstoneCollector(false);
+            return new ArrayBackedCachedPartition(header.metadata,
+                                                  header.key,
+                                                  header.sHeader.columns(),
+                                                  header.staticRow,
+                                                  rows,
+                                                  deletionBuilder.build(),
+                                                  header.sHeader.stats(),
+                                                  createdAtInSec,
+                                                  cachedLiveRows,
+                                                  rowsWithNonExpiringCells,
+                                                  nonTombstoneCellCount,
+                                                  nonExpiringLiveCells);
 
-            UnfilteredRowIteratorSerializer.serializer.deserialize(in, new 
SerializationHelper(MessagingService.current_version, 
SerializationHelper.Flag.LOCAL), h.sHeader, writer, markerWriter);
-            return partition;
         }
 
         public long serializedSize(CachedPartition partition)
@@ -248,7 +273,11 @@ public class ArrayBackedCachedPartition extends 
ArrayBackedPartition implements
             try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
             {
                 return TypeSizes.sizeof(p.createdAtInSec)
-                     + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, 
MessagingService.current_version, p.rows);
+                     + TypeSizes.sizeof(p.cachedLiveRows)
+                     + TypeSizes.sizeof(p.rowsWithNonExpiringCells)
+                     + TypeSizes.sizeof(p.nonTombstoneCellCount)
+                     + TypeSizes.sizeof(p.nonExpiringLiveCells)
+                     + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, 
MessagingService.current_version, p.rowCount());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java 
b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
index d7f3a88..4485117 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
@@ -17,28 +17,30 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
 
-public class ArrayBackedPartition extends AbstractPartitionData
+public class ArrayBackedPartition extends AbstractThreadUnsafePartition
 {
+    private final Row staticRow;
+    private final DeletionInfo deletionInfo;
+    private final RowStats stats;
+
     protected ArrayBackedPartition(CFMetaData metadata,
                                    DecoratedKey partitionKey,
-                                   DeletionTime deletionTime,
                                    PartitionColumns columns,
-                                   int initialRowCapacity,
-                                   boolean sortable)
+                                   Row staticRow,
+                                   List<Row> rows,
+                                   DeletionInfo deletionInfo,
+                                   RowStats stats)
     {
-        super(metadata, partitionKey, deletionTime, columns, 
initialRowCapacity, sortable);
+        super(metadata, partitionKey, columns, rows);
+        this.staticRow = staticRow;
+        this.deletionInfo = deletionInfo;
+        this.stats = stats;
     }
 
     /**
@@ -52,7 +54,7 @@ public class ArrayBackedPartition extends 
AbstractPartitionData
      */
     public static ArrayBackedPartition create(UnfilteredRowIterator iterator)
     {
-        return create(iterator, 4);
+        return create(iterator, 16);
     }
 
     /**
@@ -68,37 +70,45 @@ public class ArrayBackedPartition extends 
AbstractPartitionData
      */
     public static ArrayBackedPartition create(UnfilteredRowIterator iterator, 
int initialRowCapacity)
     {
-        ArrayBackedPartition partition = new 
ArrayBackedPartition(iterator.metadata(),
-                                                                  
iterator.partitionKey(),
-                                                                  
iterator.partitionLevelDeletion(),
-                                                                  
iterator.columns(),
-                                                                  
initialRowCapacity,
-                                                                  
iterator.isReverseOrder());
-
-        partition.staticRow = iterator.staticRow().takeAlias();
-
-        Writer writer = partition.new Writer(true);
-        RangeTombstoneCollector markerCollector = partition.new 
RangeTombstoneCollector(iterator.isReverseOrder());
+        CFMetaData metadata = iterator.metadata();
+        boolean reversed = iterator.isReverseOrder();
 
-        copyAll(iterator, writer, markerCollector, partition);
-
-        return partition;
-    }
+        List<Row> rows = new ArrayList<>(initialRowCapacity);
+        MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), 
metadata.comparator, reversed);
 
-    protected static void copyAll(UnfilteredRowIterator iterator, Writer 
writer, RangeTombstoneCollector markerCollector, ArrayBackedPartition partition)
-    {
         while (iterator.hasNext())
         {
             Unfiltered unfiltered = iterator.next();
             if (unfiltered.kind() == Unfiltered.Kind.ROW)
-                ((Row) unfiltered).copyTo(writer);
+                rows.add((Row)unfiltered);
             else
-                ((RangeTombstoneMarker) unfiltered).copyTo(markerCollector);
+                deletionBuilder.add((RangeTombstoneMarker)unfiltered);
         }
 
-        // A Partition (or more precisely AbstractPartitionData) always 
assumes that its data is in clustering
-        // order. So if we've just added them in reverse clustering order, 
reverse them.
-        if (iterator.isReverseOrder())
-            partition.reverse();
+        if (reversed)
+            Collections.reverse(rows);
+
+        return new ArrayBackedPartition(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), 
iterator.stats());
+    }
+
+    protected boolean canHaveShadowedData()
+    {
+        // We only create instances from UnfilteredRowIterator that don't have 
shadowed data
+        return false;
+    }
+
+    public Row staticRow()
+    {
+        return staticRow;
+    }
+
+    public DeletionInfo deletionInfo()
+    {
+        return deletionInfo;
+    }
+
+    public RowStats stats()
+    {
+        return stats;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 6a888a6..1361422 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -26,11 +26,8 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -89,10 +86,7 @@ public class AtomicBTreePartition implements Partition
 
     private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> 
wasteTrackerUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, 
"wasteTracker");
 
-    private static final DeletionInfo LIVE = DeletionInfo.live();
-    // This is a small optimization: DeletionInfo is mutable, but we know that 
we will always copy it in that class,
-    // so we can safely alias one DeletionInfo.live() reference and avoid some 
allocations.
-    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, 
RowStats.NO_STATS);
+    private static final Holder EMPTY = new Holder(BTree.empty(), 
DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, RowStats.NO_STATS);
 
     private final CFMetaData metadata;
     private final DecoratedKey partitionKey;
@@ -154,146 +148,56 @@ public class AtomicBTreePartition implements Partition
         return row == null || (clustering == Clustering.STATIC_CLUSTERING && 
row.isEmpty()) ? null : row;
     }
 
+    private Row staticRow(Holder current, ColumnFilter columns, boolean 
setActiveDeletionToRow)
+    {
+        DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
+        if (columns.fetchedColumns().statics.isEmpty() || 
(current.staticRow.isEmpty() && partitionDeletion.isLive()))
+            return Rows.EMPTY_STATIC_ROW;
+
+        Row row = current.staticRow.filter(columns, partitionDeletion, 
setActiveDeletionToRow, metadata);
+        return row == null ? Rows.EMPTY_STATIC_ROW : row;
+    }
+
     public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter 
columns, final boolean reversed)
     {
         // TODO: we could optimize comparison for "NativeRow" à la #6755
         final Holder current = ref;
         return new SearchIterator<Clustering, Row>()
         {
-            private final SearchIterator<Clustering, MemtableRowData> rawIter 
= new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
-            private final MemtableRowData.ReusableRow row = 
allocator.newReusableRow();
-            private final ReusableFilteringRow filter = new 
ReusableFilteringRow(columns.fetchedColumns().regulars, columns);
-            private final long partitionDeletion = 
current.deletionInfo.getPartitionDeletion().markedForDeleteAt();
+            private final SearchIterator<Clustering, Row> rawIter = new 
BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+            private final DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
 
             public boolean hasNext()
             {
                 return rawIter.hasNext();
             }
 
-            public Row next(Clustering key)
+            public Row next(Clustering clustering)
             {
-                if (key == Clustering.STATIC_CLUSTERING)
-                    return makeStatic(columns, current, allocator);
+                if (clustering == Clustering.STATIC_CLUSTERING)
+                    return staticRow(current, columns, true);
 
-                MemtableRowData data = rawIter.next(key);
-                // We also need to find if there is a range tombstone covering 
this key
-                RangeTombstone rt = current.deletionInfo.rangeCovering(key);
+                Row row = rawIter.next(clustering);
+                RangeTombstone rt = 
current.deletionInfo.rangeCovering(clustering);
 
-                if (data == null)
-                {
-                    // If we have a range tombstone but not data, "fake" the 
RT by return a row deletion
-                    // corresponding to the tombstone.
-                    if (rt != null && rt.deletionTime().markedForDeleteAt() > 
partitionDeletion)
-                        return 
filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, 
rt.deletionTime()));
-                    return null;
-                }
+                // A search iterator only return a row, so it doesn't allow to 
directly account for deletion that should apply to to row
+                // (the partition deletion or the deletion of a range 
tombstone that covers it). So if needs be, reuse the row deletion
+                // to carry the proper deletion on the row.
+                DeletionTime activeDeletion = partitionDeletion;
+                if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+                    activeDeletion = rt.deletionTime();
 
-                row.setTo(data);
+                if (row == null)
+                    return activeDeletion.isLive() ? null : 
ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
 
-                filter.setRowDeletion(null);
-                if (rt == null || rt.deletionTime().markedForDeleteAt() < 
partitionDeletion)
-                {
-                    filter.setDeletionTimestamp(partitionDeletion);
-                }
-                else
-                {
-                    
filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt());
-                    // If we have a range tombstone covering that row and it's 
bigger than the row deletion itself, then
-                    // we replace the row deletion by the tombstone deletion 
as a way to return the tombstone.
-                    if (rt.deletionTime().supersedes(row.deletion()))
-                        filter.setRowDeletion(rt.deletionTime());
-                }
-
-                return filter.setTo(row);
-            }
-        };
-    }
-
-    private static Row emptyDeletedRow(Clustering clustering, DeletionTime 
deletion)
-    {
-        return new AbstractRow()
-        {
-            public Columns columns()
-            {
-                return Columns.NONE;
-            }
-
-            public LivenessInfo primaryKeyLivenessInfo()
-            {
-                return LivenessInfo.NONE;
-            }
-
-            public DeletionTime deletion()
-            {
-                return deletion;
-            }
-
-            public boolean isEmpty()
-            {
-                return true;
-            }
-
-            public boolean hasComplexDeletion()
-            {
-                return false;
-            }
-
-            public Clustering clustering()
-            {
-                return clustering;
-            }
-
-            public Cell getCell(ColumnDefinition c)
-            {
-                return null;
-            }
-
-            public Cell getCell(ColumnDefinition c, CellPath path)
-            {
-                return null;
-            }
-
-            public Iterator<Cell> getCells(ColumnDefinition c)
-            {
-                return null;
-            }
-
-            public DeletionTime getDeletion(ColumnDefinition c)
-            {
-                return DeletionTime.LIVE;
-            }
-
-            public Iterator<Cell> iterator()
-            {
-                return Iterators.<Cell>emptyIterator();
-            }
-
-            public SearchIterator<ColumnDefinition, ColumnData> 
searchIterator()
-            {
-                return new SearchIterator<ColumnDefinition, ColumnData>()
-                {
-                    public boolean hasNext()
-                    {
-                        return false;
-                    }
-
-                    public ColumnData next(ColumnDefinition column)
-                    {
-                        return null;
-                    }
-                };
-            }
-
-            public Row takeAlias()
-            {
-                return this;
+                return row.filter(columns, activeDeletion, true, metadata);
             }
         };
     }
 
     public UnfilteredRowIterator unfilteredIterator()
     {
-        return unfilteredIterator(ColumnFilter.selection(columns()), 
Slices.ALL, false);
+        return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, 
false);
     }
 
     public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, 
Slices slices, boolean reversed)
@@ -309,7 +213,7 @@ public class AtomicBTreePartition implements Partition
                                                      partitionKey,
                                                      partitionDeletion,
                                                      
selection.fetchedColumns(),
-                                                     makeStatic(selection, 
current, allocator),
+                                                     staticRow(current, 
selection, false),
                                                      reversed,
                                                      current.stats)
             {
@@ -320,189 +224,51 @@ public class AtomicBTreePartition implements Partition
             };
         }
 
+        Holder current = ref;
+        Row staticRow = staticRow(current, selection, false);
         return slices.size() == 1
-             ? new SingleSliceIterator(metadata, partitionKey, ref, selection, 
slices.get(0), reversed, allocator)
-             : new SlicesIterator(metadata, partitionKey, ref, selection, 
slices, reversed, allocator);
+             ? sliceIterator(selection, slices.get(0), reversed, current, 
staticRow)
+             : new SlicesIterator(metadata, partitionKey, selection, slices, 
reversed, current, staticRow);
     }
 
-    private static Row makeStatic(ColumnFilter selection, Holder holder, 
MemtableAllocator allocator)
+    private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice 
slice, boolean reversed, Holder current, Row staticRow)
     {
-        Columns statics = selection.fetchedColumns().statics;
-        if (statics.isEmpty() || holder.staticRow == null)
-            return Rows.EMPTY_STATIC_ROW;
-
-        return new ReusableFilteringRow(statics, selection)
-               
.setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt())
-               .setTo(allocator.newReusableRow().setTo(holder.staticRow));
+        Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : 
slice.start();
+        Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
+        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, 
start, true, end, true, !reversed);
+
+        return new RowAndDeletionMergeIterator(metadata,
+                                               partitionKey,
+                                               
current.deletionInfo.getPartitionDeletion(),
+                                               selection,
+                                               staticRow,
+                                               reversed,
+                                               current.stats,
+                                               rowIter,
+                                               
current.deletionInfo.rangeIterator(slice, reversed),
+                                               true);
     }
 
-    private static class ReusableFilteringRow extends FilteringRow
+    public class SlicesIterator extends AbstractUnfilteredRowIterator
     {
-        private final Columns columns;
-        private final ColumnFilter selection;
-        private ColumnFilter.Tester tester;
-        private long deletionTimestamp;
-
-        // Used by searchIterator in case the row is covered by a tombstone.
-        private DeletionTime rowDeletion;
-
-        public ReusableFilteringRow(Columns columns, ColumnFilter selection)
-        {
-            this.columns = columns;
-            this.selection = selection;
-        }
-
-        public ReusableFilteringRow setDeletionTimestamp(long timestamp)
-        {
-            this.deletionTimestamp = timestamp;
-            return this;
-        }
-
-        public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion)
-        {
-            this.rowDeletion = rowDeletion;
-            return this;
-        }
-
-        @Override
-        public DeletionTime deletion()
-        {
-            return rowDeletion == null ? super.deletion() : rowDeletion;
-        }
-
-        @Override
-        protected boolean include(LivenessInfo info)
-        {
-            return info.timestamp() > deletionTimestamp;
-        }
-
-        @Override
-        protected boolean include(ColumnDefinition def)
-        {
-            return columns.contains(def);
-        }
-
-        @Override
-        protected boolean include(DeletionTime dt)
-        {
-            return dt.markedForDeleteAt() > deletionTimestamp;
-        }
-
-        @Override
-        protected boolean include(ColumnDefinition c, DeletionTime dt)
-        {
-            return dt.markedForDeleteAt() > deletionTimestamp;
-        }
-
-        @Override
-        protected boolean include(Cell cell)
-        {
-            return selection.includes(cell);
-        }
-    }
-
-    private static class SingleSliceIterator extends 
AbstractUnfilteredRowIterator
-    {
-        private final Iterator<Unfiltered> iterator;
-        private final ReusableFilteringRow row;
-
-        private SingleSliceIterator(CFMetaData metadata,
-                                    DecoratedKey key,
-                                    Holder holder,
-                                    ColumnFilter selection,
-                                    Slice slice,
-                                    boolean isReversed,
-                                    MemtableAllocator allocator)
-        {
-            super(metadata,
-                  key,
-                  holder.deletionInfo.getPartitionDeletion(),
-                  selection.fetchedColumns(),
-                  makeStatic(selection, holder, allocator),
-                  isReversed,
-                  holder.stats);
-
-            Iterator<Row> rowIter = rowIter(metadata,
-                                            holder,
-                                            slice,
-                                            !isReversed,
-                                            allocator);
-
-            this.iterator = new 
RowAndTombstoneMergeIterator(metadata.comparator, isReversed)
-                            .setTo(rowIter, 
holder.deletionInfo.rangeIterator(slice, isReversed));
-
-            this.row = new 
ReusableFilteringRow(selection.fetchedColumns().regulars, selection)
-                       
.setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt());
-        }
-
-        private Iterator<Row> rowIter(CFMetaData metadata,
-                                      Holder holder,
-                                      Slice slice,
-                                      boolean forwards,
-                                      final MemtableAllocator allocator)
-        {
-            Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : 
slice.start();
-            Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : 
slice.end();
-            final Iterator<MemtableRowData> dataIter = 
BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards);
-            return new AbstractIterator<Row>()
-            {
-                private final MemtableRowData.ReusableRow row = 
allocator.newReusableRow();
-
-                protected Row computeNext()
-                {
-                    return dataIter.hasNext() ? row.setTo(dataIter.next()) : 
endOfData();
-                }
-            };
-        }
-
-        protected Unfiltered computeNext()
-        {
-            while (iterator.hasNext())
-            {
-                Unfiltered next = iterator.next();
-                if (next.kind() == Unfiltered.Kind.ROW)
-                {
-                    row.setTo((Row)next);
-                    if (!row.isEmpty())
-                        return row;
-                }
-                else
-                {
-                    RangeTombstoneMarker marker = (RangeTombstoneMarker)next;
-
-                    long deletion = 
partitionLevelDeletion().markedForDeleteAt();
-                    if (marker.isOpen(isReverseOrder()))
-                        deletion = Math.max(deletion, 
marker.openDeletionTime(isReverseOrder()).markedForDeleteAt());
-                    row.setDeletionTimestamp(deletion);
-                    return marker;
-                }
-            }
-            return endOfData();
-        }
-    }
-
-    public static class SlicesIterator extends AbstractUnfilteredRowIterator
-    {
-        private final Holder holder;
-        private final MemtableAllocator allocator;
+        private final Holder current;
         private final ColumnFilter selection;
         private final Slices slices;
 
         private int idx;
-        private UnfilteredRowIterator currentSlice;
+        private Iterator<Unfiltered> currentSlice;
 
         private SlicesIterator(CFMetaData metadata,
                                DecoratedKey key,
-                               Holder holder,
                                ColumnFilter selection,
                                Slices slices,
                                boolean isReversed,
-                               MemtableAllocator allocator)
+                               Holder holder,
+                               Row staticRow)
         {
-            super(metadata, key, holder.deletionInfo.getPartitionDeletion(), 
selection.fetchedColumns(), makeStatic(selection, holder, allocator), 
isReversed, holder.stats);
-            this.holder = holder;
+            super(metadata, key, holder.deletionInfo.getPartitionDeletion(), 
selection.fetchedColumns(), staticRow, isReversed, holder.stats);
+            this.current = holder;
             this.selection = selection;
-            this.allocator = allocator;
             this.slices = slices;
         }
 
@@ -516,13 +282,7 @@ public class AtomicBTreePartition implements Partition
                         return endOfData();
 
                     int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : 
idx;
-                    currentSlice = new SingleSliceIterator(metadata,
-                                                           partitionKey,
-                                                           holder,
-                                                           selection,
-                                                           
slices.get(sliceIdx),
-                                                           isReverseOrder,
-                                                           allocator);
+                    currentSlice = sliceIterator(selection, 
slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW);
                     idx++;
                 }
 
@@ -565,7 +325,7 @@ public class AtomicBTreePartition implements Partition
                     if (inputDeletionInfoCopy == null)
                         inputDeletionInfoCopy = 
update.deletionInfo().copy(HeapAllocator.instance);
 
-                    deletionInfo = 
current.deletionInfo.copy().add(inputDeletionInfoCopy);
+                    deletionInfo = 
current.deletionInfo.mutableCopy().add(inputDeletionInfoCopy);
                     updater.allocated(deletionInfo.unsharedHeapSize() - 
current.deletionInfo.unsharedHeapSize());
                 }
                 else
@@ -574,9 +334,9 @@ public class AtomicBTreePartition implements Partition
                 }
 
                 Row newStatic = update.staticRow();
-                MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW
-                                          ? current.staticRow
-                                          : (current.staticRow == null ? 
updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
+                Row staticRow = newStatic.isEmpty()
+                              ? current.staticRow
+                              : (current.staticRow.isEmpty() ? 
updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
                 Object[] tree = BTree.update(current.tree, 
update.metadata().comparator, update, update.rowCount(), updater);
                 RowStats newStats = current.stats.mergeWith(update.stats());
 
@@ -661,10 +421,10 @@ public class AtomicBTreePartition implements Partition
         final DeletionInfo deletionInfo;
         // the btree of rows
         final Object[] tree;
-        final MemtableRowData staticRow;
+        final Row staticRow;
         final RowStats stats;
 
-        Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData 
staticRow, RowStats stats)
+        Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, 
RowStats stats)
         {
             this.tree = tree;
             this.deletionInfo = deletionInfo;
@@ -679,7 +439,7 @@ public class AtomicBTreePartition implements Partition
     }
 
     // the function we provide to the btree utilities to perform any column 
replacements
-    private static final class RowUpdater implements UpdateFunction<Row, 
MemtableRowData>
+    private static final class RowUpdater implements UpdateFunction<Row, Row>
     {
         final AtomicBTreePartition updating;
         final MemtableAllocator allocator;
@@ -687,13 +447,13 @@ public class AtomicBTreePartition implements Partition
         final Updater indexer;
         final int nowInSec;
         Holder ref;
+        Row.Builder regularBuilder;
         long dataSize;
         long heapSize;
         long colUpdateTimeDelta = Long.MAX_VALUE;
-        final MemtableRowData.ReusableRow row;
         final MemtableAllocator.DataReclaimer reclaimer;
-        final MemtableAllocator.RowAllocator rowAllocator;
-        List<MemtableRowData> inserted; // TODO: replace with walk of aborted 
BTree
+        List<Row> inserted; // TODO: replace with walk of aborted BTree
+
 
         private RowUpdater(AtomicBTreePartition updating, MemtableAllocator 
allocator, OpOrder.Group writeOp, Updater indexer)
         {
@@ -702,18 +462,25 @@ public class AtomicBTreePartition implements Partition
             this.writeOp = writeOp;
             this.indexer = indexer;
             this.nowInSec = FBUtilities.nowInSeconds();
-            this.row = allocator.newReusableRow();
             this.reclaimer = allocator.reclaimer();
-            this.rowAllocator = allocator.newRowAllocator(updating.metadata(), 
writeOp);
         }
 
-        public MemtableRowData apply(Row insert)
+        private Row.Builder builder(Clustering clustering)
         {
-            rowAllocator.allocateNewRow(insert.clustering().size(), 
insert.columns(), insert.isStatic());
-            insert.copyTo(rowAllocator);
-            MemtableRowData data = rowAllocator.allocatedRowData();
+            boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
+            // We know we only insert/update one static per PartitionUpdate, 
so no point in saving the builder
+            if (isStatic)
+                return allocator.rowBuilder(updating.metadata(), writeOp, 
true);
+
+            if (regularBuilder == null)
+                regularBuilder = allocator.rowBuilder(updating.metadata(), 
writeOp, false);
+            return regularBuilder;
+        }
 
-            insertIntoIndexes(insert);
+        public Row apply(Row insert)
+        {
+            Row data = Rows.copy(insert, builder(insert.clustering())).build();
+            insertIntoIndexes(data);
 
             this.dataSize += data.dataSize();
             this.heapSize += data.unsharedHeapSizeExcludingData();
@@ -723,14 +490,14 @@ public class AtomicBTreePartition implements Partition
             return data;
         }
 
-        public MemtableRowData apply(MemtableRowData existing, Row update)
+        public Row apply(Row existing, Row update)
         {
             Columns mergedColumns = 
existing.columns().mergeTo(update.columns());
-            rowAllocator.allocateNewRow(update.clustering().size(), 
mergedColumns, update.isStatic());
 
-            colUpdateTimeDelta = Math.min(colUpdateTimeDelta, 
Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, 
indexer));
+            Row.Builder builder = builder(existing.clustering());
+            colUpdateTimeDelta = Math.min(colUpdateTimeDelta, 
Rows.merge(existing, update, mergedColumns, builder, nowInSec, indexer));
 
-            MemtableRowData reconciled = rowAllocator.allocatedRowData();
+            Row reconciled = builder.build();
 
             dataSize += reconciled.dataSize() - existing.dataSize();
             heapSize += reconciled.unsharedHeapSizeExcludingData() - 
existing.unsharedHeapSizeExcludingData();
@@ -749,7 +516,7 @@ public class AtomicBTreePartition implements Partition
 
             maybeIndexPrimaryKeyColumns(toInsert);
             Clustering clustering = toInsert.clustering();
-            for (Cell cell : toInsert)
+            for (Cell cell : toInsert.cells())
                 indexer.insert(clustering, cell);
         }
 
@@ -761,15 +528,15 @@ public class AtomicBTreePartition implements Partition
             long timestamp = row.primaryKeyLivenessInfo().timestamp();
             int ttl = row.primaryKeyLivenessInfo().ttl();
 
-            for (Cell cell : row)
+            for (Cell cell : row.cells())
             {
-                long cellTimestamp = cell.livenessInfo().timestamp();
+                long cellTimestamp = cell.timestamp();
                 if (cell.isLive(nowInSec))
                 {
                     if (cellTimestamp > timestamp)
                     {
                         timestamp = cellTimestamp;
-                        ttl = cell.livenessInfo().ttl();
+                        ttl = cell.ttl();
                     }
                 }
             }
@@ -783,19 +550,19 @@ public class AtomicBTreePartition implements Partition
             this.heapSize = 0;
             if (inserted != null)
             {
-                for (MemtableRowData row : inserted)
+                for (Row row : inserted)
                     abort(row);
                 inserted.clear();
             }
             reclaimer.cancel();
         }
 
-        protected void abort(MemtableRowData abort)
+        protected void abort(Row abort)
         {
             reclaimer.reclaimImmediately(abort);
         }
 
-        protected void discard(MemtableRowData discard)
+        protected void discard(Row discard)
         {
             reclaimer.reclaim(discard);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
 
b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
index acaef5d..e5d1e75 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
@@ -49,10 +49,10 @@ public class CountingUnfilteredRowIterator extends 
WrappingUnfilteredRowIterator
     @Override
     public Unfiltered next()
     {
-        Unfiltered unfiltered = super.next();
-        if (unfiltered.kind() == Unfiltered.Kind.ROW)
-            counter.newRow((Row) unfiltered);
-        return unfiltered;
+        Unfiltered next = super.next();
+        if (next.isRow())
+            counter.newRow((Row)next);
+        return next;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java 
b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 813654d..1cac274 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -17,21 +17,24 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import java.util.Iterator;
+import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 
-public class FilteredPartition extends AbstractPartitionData implements 
Iterable<Row>
+public class FilteredPartition extends AbstractThreadUnsafePartition
 {
+    private final Row staticRow;
+
     private FilteredPartition(CFMetaData metadata,
                               DecoratedKey partitionKey,
                               PartitionColumns columns,
-                              int initialRowCapacity,
-                              boolean sortable)
+                              Row staticRow,
+                              List<Row> rows)
     {
-        super(metadata, partitionKey, DeletionTime.LIVE, columns, 
initialRowCapacity, sortable);
+        super(metadata, partitionKey, columns, rows);
+        this.staticRow = staticRow;
     }
 
     /**
@@ -42,25 +45,43 @@ public class FilteredPartition extends 
AbstractPartitionData implements Iterable
      */
     public static FilteredPartition create(RowIterator iterator)
     {
-        FilteredPartition partition = new 
FilteredPartition(iterator.metadata(),
-                                                            
iterator.partitionKey(),
-                                                            iterator.columns(),
-                                                            4,
-                                                            
iterator.isReverseOrder());
-
-        partition.staticRow = iterator.staticRow().takeAlias();
+        CFMetaData metadata = iterator.metadata();
+        boolean reversed = iterator.isReverseOrder();
 
-        Writer writer = partition.new Writer(true);
+        List<Row> rows = new ArrayList<>();
 
         while (iterator.hasNext())
-            iterator.next().copyTo(writer);
+        {
+            Unfiltered unfiltered = iterator.next();
+            if (unfiltered.isRow())
+                rows.add((Row)unfiltered);
+        }
+
+        if (reversed)
+            Collections.reverse(rows);
+
+        return new FilteredPartition(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows);
+    }
+
+    protected boolean canHaveShadowedData()
+    {
+        // We only create instances from RowIterator that don't have shadowed 
data (nor deletion info really)
+        return false;
+    }
 
-        // A Partition (or more precisely AbstractPartitionData) always 
assumes that its data is in clustering
-        // order. So if we've just added them in reverse clustering order, 
reverse them.
-        if (iterator.isReverseOrder())
-            partition.reverse();
+    public Row staticRow()
+    {
+        return staticRow;
+    }
 
-        return partition;
+    public DeletionInfo deletionInfo()
+    {
+        return DeletionInfo.LIVE;
+    }
+
+    public RowStats stats()
+    {
+        return RowStats.NO_STATS;
     }
 
     public RowIterator rowIterator()
@@ -90,7 +111,7 @@ public class FilteredPartition extends AbstractPartitionData 
implements Iterable
 
             public Row staticRow()
             {
-                return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+                return FilteredPartition.this.staticRow();
             }
 
             public boolean hasNext()
@@ -117,26 +138,20 @@ public class FilteredPartition extends 
AbstractPartitionData implements Iterable
     @Override
     public String toString()
     {
-        try (RowIterator iterator = rowIterator())
-        {
-            StringBuilder sb = new StringBuilder();
-            CFMetaData metadata = iterator.metadata();
-            PartitionColumns columns = iterator.columns();
+        StringBuilder sb = new StringBuilder();
 
-            sb.append(String.format("[%s.%s] key=%s columns=%s reversed=%b",
-                                    metadata.ksName,
-                                    metadata.cfName,
-                                    
metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
-                                    columns,
-                                    iterator.isReverseOrder()));
+        sb.append(String.format("[%s.%s] key=%s columns=%s",
+                    metadata.ksName,
+                    metadata.cfName,
+                    
metadata.getKeyValidator().getString(partitionKey().getKey()),
+                    columns));
 
-            if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
-                sb.append("\n    
").append(iterator.staticRow().toString(metadata));
+        if (staticRow() != Rows.EMPTY_STATIC_ROW)
+            sb.append("\n    ").append(staticRow().toString(metadata));
 
-            while (iterator.hasNext())
-                sb.append("\n    ").append(iterator.next().toString(metadata));
+        for (Row row : this)
+            sb.append("\n    ").append(row.toString(metadata));
 
-            return sb.toString();
-        }
+        return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java 
b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
deleted file mode 100644
index c40109b..0000000
--- 
a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * Abstract class to make it easier to write iterators that filter some
- * parts of another iterator (used for purging tombstones and removing dropped 
columns).
- */
-public abstract class FilteringPartitionIterator extends 
WrappingUnfilteredPartitionIterator
-{
-    private UnfilteredRowIterator next;
-
-    protected FilteringPartitionIterator(UnfilteredPartitionIterator iter)
-    {
-        super(iter);
-    }
-
-    // The filter to use for filtering row contents. Is null by default to 
mean no particular filtering
-    // but can be overriden by subclasses. Please see FilteringAtomIterator 
for details on how this is used.
-    protected FilteringRow makeRowFilter()
-    {
-        return null;
-    }
-
-    // Whether or not we should bother filtering the provided rows iterator. 
This
-    // exists mainly for preformance
-    protected boolean shouldFilter(UnfilteredRowIterator iterator)
-    {
-        return true;
-    }
-
-    protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
-    {
-        return true;
-    }
-
-    protected boolean includePartitionDeletion(DeletionTime dt)
-    {
-        return true;
-    }
-
-    // Allows to modify the range tombstone returned. This is called *after* 
includeRangeTombstoneMarker has been called.
-    protected RangeTombstoneMarker 
filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
-    {
-        return marker;
-    }
-
-    // Called when a particular partition is skipped due to being empty post 
filtering
-    protected void onEmpty(DecoratedKey key)
-    {
-    }
-
-    public boolean hasNext()
-    {
-        while (next == null && super.hasNext())
-        {
-            UnfilteredRowIterator iterator = super.next();
-            if (shouldFilter(iterator))
-            {
-                next = new FilteringIterator(iterator);
-                if (!isForThrift() && next.isEmpty())
-                {
-                    onEmpty(iterator.partitionKey());
-                    iterator.close();
-                    next = null;
-                }
-            }
-            else
-            {
-                next = iterator;
-            }
-        }
-        return next != null;
-    }
-
-    public UnfilteredRowIterator next()
-    {
-        UnfilteredRowIterator toReturn = next;
-        next = null;
-        return toReturn;
-    }
-
-    @Override
-    public void close()
-    {
-        try
-        {
-            super.close();
-        }
-        finally
-        {
-            if (next != null)
-                next.close();
-        }
-    }
-
-    private class FilteringIterator extends FilteringRowIterator
-    {
-        private FilteringIterator(UnfilteredRowIterator iterator)
-        {
-            super(iterator);
-        }
-
-        @Override
-        protected FilteringRow makeRowFilter()
-        {
-            return FilteringPartitionIterator.this.makeRowFilter();
-        }
-
-        @Override
-        protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker 
marker)
-        {
-            return 
FilteringPartitionIterator.this.includeRangeTombstoneMarker(marker);
-        }
-
-        @Override
-        protected RangeTombstoneMarker 
filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
-        {
-            return 
FilteringPartitionIterator.this.filterRangeTombstoneMarker(marker, reversed);
-        }
-
-        @Override
-        protected boolean includePartitionDeletion(DeletionTime dt)
-        {
-            return 
FilteringPartitionIterator.this.includePartitionDeletion(dt);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java
new file mode 100644
index 0000000..510b9c8
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+
+public interface PartitionStatisticsCollector
+{
+    public void update(LivenessInfo info);
+    public void update(DeletionTime deletionTime);
+    public void update(Cell cell);
+    public void updateColumnSetPerRow(long columnSetInRow);
+    public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards);
+}

Reply via email to