Refactor MV code

patch by slebresne; reviewed by carlyeks for CASSANDRA-11475


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/86ba2274
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/86ba2274
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/86ba2274

Branch: refs/heads/cassandra-3.0
Commit: 86ba227477b9f8595eb610ecaf950cfbc29dd36b
Parents: c19066e
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri Mar 11 14:19:38 2016 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri May 6 13:41:41 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/cassandra/config/CFMetaData.java |   6 +
 .../apache/cassandra/config/ViewDefinition.java |   1 -
 .../cql3/statements/CreateViewStatement.java    |   4 +-
 .../cql3/statements/SelectStatement.java        |  41 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   6 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +-
 .../db/SinglePartitionReadCommand.java          |  33 +
 src/java/org/apache/cassandra/db/Slices.java    |   7 +
 .../apache/cassandra/db/filter/RowFilter.java   |  24 +
 .../SingletonUnfilteredPartitionIterator.java   |   3 +-
 .../apache/cassandra/db/rows/AbstractCell.java  |   5 +
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  34 +-
 .../apache/cassandra/db/rows/BufferCell.java    |   5 +
 src/java/org/apache/cassandra/db/rows/Cell.java |   2 +
 .../apache/cassandra/db/rows/ColumnData.java    |   2 +
 .../cassandra/db/rows/ComplexColumnData.java    |   8 +
 src/java/org/apache/cassandra/db/rows/Row.java  |  35 +-
 .../cassandra/db/rows/RowDiffListener.java      |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   2 +-
 .../apache/cassandra/db/view/TableViews.java    | 481 ++++++++++++++
 .../apache/cassandra/db/view/TemporalRow.java   | 610 ------------------
 src/java/org/apache/cassandra/db/view/View.java | 629 ++-----------------
 .../apache/cassandra/db/view/ViewBuilder.java   |  38 +-
 .../apache/cassandra/db/view/ViewManager.java   | 146 +----
 .../cassandra/db/view/ViewUpdateGenerator.java  | 549 ++++++++++++++++
 .../org/apache/cassandra/cql3/ViewTest.java     |  52 +-
 .../org/apache/cassandra/db/rows/RowsTest.java  |   6 +-
 28 files changed, 1399 insertions(+), 1337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0679e11..3a49f6a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+3.0.7
+ * Refactor Materialized View code (CASSANDRA-11475)
+
 3.0.6
  * Disallow creating view with a static column (CASSANDRA-11602)
  * Reduce the amount of object allocations caused by the getFunctions methods 
(CASSANDRA-11593)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 79cd779..e263697 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -612,6 +613,11 @@ public final class CFMetaData
         };
     }
 
+    public Iterable<ColumnDefinition> primaryKeyColumns()
+    {
+        return Iterables.concat(partitionKeyColumns, clusteringColumns);
+    }
+
     public List<ColumnDefinition> partitionKeyColumns()
     {
         return partitionKeyColumns;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/config/ViewDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ViewDefinition.java 
b/src/java/org/apache/cassandra/config/ViewDefinition.java
index b29a8f9..5300f56 100644
--- a/src/java/org/apache/cassandra/config/ViewDefinition.java
+++ b/src/java/org/apache/cassandra/config/ViewDefinition.java
@@ -37,7 +37,6 @@ public class ViewDefinition
     public final UUID baseTableId;
     public final String baseTableName;
     public final boolean includeAllColumns;
-    // The order of partititon columns and clustering columns is important, so 
we cannot switch these two to sets
     public final CFMetaData metadata;
 
     public SelectStatement.RawStatement select;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 45231b7..6446602 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -227,10 +227,10 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
         // This is only used as an intermediate state; this is to catch 
whether multiple non-PK columns are used
         boolean hasNonPKColumn = false;
         for (ColumnIdentifier.Raw raw : partitionKeys)
-            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, 
hasNonPKColumn, raw, targetPartitionKeys, restrictions);
+            hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, 
hasNonPKColumn, raw, targetPartitionKeys, restrictions);
 
         for (ColumnIdentifier.Raw raw : clusteringKeys)
-            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, 
hasNonPKColumn, raw, targetClusteringColumns, restrictions);
+            hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, 
hasNonPKColumn, raw, targetClusteringColumns, restrictions);
 
         // We need to include all of the primary key columns from the base 
table in order to make sure that we do not
         // overwrite values in the view. We cannot support "collapsing" the 
base table into a smaller number of rows in

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index c3e13f4..0e33475 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -151,6 +151,15 @@ public class SelectStatement implements CQLStatement
         return builder.build();
     }
 
+    /**
+     * The columns to fetch internally for this SELECT statement (which can be 
more than the one selected by the
+     * user as it also include any restricted column in particular).
+     */
+    public ColumnFilter queriedColumns()
+    {
+        return queriedColumns;
+    }
+
     // Creates a simple select based on the given selection.
     // Note that the results select statement should not be used for actual 
queries, but only for processing already
     // queried data through processColumnFamily.
@@ -473,7 +482,29 @@ public class SelectStatement implements CQLStatement
     }
 
     /**
-     * Returns a read command that can be used internally to filter individual 
rows for materialized views.
+     * Returns the slices fetched by this SELECT, assuming an internal call 
(no bound values in particular).
+     * <p>
+     * Note that if the SELECT intrinsically selects rows by names, we convert 
them into equivalent slices for
+     * the purpose of this method. This is used for MVs to restrict what needs 
to be read when we want to read
+     * everything that could be affected by a given view (and so, if the view 
SELECT statement has restrictions
+     * on the clustering columns, we can restrict what we read).
+     */
+    public Slices clusteringIndexFilterAsSlices()
+    {
+        QueryOptions options = 
QueryOptions.forInternalCalls(Collections.emptyList());
+        ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
+        if (filter instanceof ClusteringIndexSliceFilter)
+            return ((ClusteringIndexSliceFilter)filter).requestedSlices();
+
+        Slices.Builder builder = new Slices.Builder(cfm.comparator);
+        for (Clustering clustering: 
((ClusteringIndexNamesFilter)filter).requestedRows())
+            builder.add(Slice.make(clustering));
+        return builder.build();
+    }
+
+    /**
+     * Returns a read command that can be used internally to query all the 
rows queried by this SELECT for a
+     * give key (used for materialized views).
      */
     public SinglePartitionReadCommand internalReadForView(DecoratedKey key, 
int nowInSec)
     {
@@ -483,6 +514,14 @@ public class SelectStatement implements CQLStatement
         return SinglePartitionReadCommand.create(cfm, nowInSec, 
queriedColumns, rowFilter, DataLimits.NONE, key, filter);
     }
 
+    /**
+     * The {@code RowFilter} for this SELECT, assuming an internal call (no 
bound values in particular).
+     */
+    public RowFilter rowFilterForInternalCalls()
+    {
+        return 
getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList()));
+    }
+
     private ReadQuery getRangeCommand(QueryOptions options, DataLimits limit, 
int nowInSec) throws RequestValidationException
     {
         ClusteringIndexFilter clusteringIndexFilter = 
makeClusteringIndexFilter(options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 919fed6..5b8de8f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -48,7 +48,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.view.ViewManager;
+import org.apache.cassandra.db.view.TableViews;
 import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -198,7 +198,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     public final SecondaryIndexManager indexManager;
-    public final ViewManager.ForStore viewManager;
+    public final TableViews viewManager;
 
     /* These are locally held copies to be changed from the config during 
runtime */
     private volatile DefaultValue<Integer> minCompactionThreshold;
@@ -373,7 +373,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         maxCompactionThreshold = new 
DefaultValue<>(metadata.params.compaction.maxCompactionThreshold());
         crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance);
         indexManager = new SecondaryIndexManager(this);
-        viewManager = keyspace.viewManager.forTable(metadata.cfId);
+        viewManager = keyspace.viewManager.forTable(metadata);
         metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 5783b41..273946e 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -484,7 +484,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations 
from base table replica");
-                        viewManager.pushViewReplicaUpdates(upd, !isClReplay, 
baseComplete);
+                        
viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, !isClReplay, 
baseComplete);
                     }
                     catch (Throwable t)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 14923b9..d5f2dc4 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -229,6 +229,39 @@ public class SinglePartitionReadCommand extends ReadCommand
         return create(metadata, nowInSec, metadata.decorateKey(key), slices);
     }
 
+    /**
+     * Creates a new single partition name command for the provided rows.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param names the clustering for the rows to query.
+     *
+     * @return a newly created read command that queries the {@code names} in 
{@code key}. The returned query will
+     * query every columns (without limit or row filtering) and be in forward 
order.
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
+    {
+        ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(names, false);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, 
ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+    }
+
+    /**
+     * Creates a new single partition name command for the provided row.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param name the clustering for the row to query.
+     *
+     * @return a newly created read command that queries {@code name} in 
{@code key}. The returned query will
+     * query every columns (without limit or row filtering).
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int 
nowInSec, DecoratedKey key, Clustering name)
+    {
+        return create(metadata, nowInSec, key, FBUtilities.singleton(name, 
metadata.comparator));
+    }
+
     public SinglePartitionReadCommand copy()
     {
         return new SinglePartitionReadCommand(isDigestQuery(), 
digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), 
rowFilter(), limits(), partitionKey(), clusteringIndexFilter());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java 
b/src/java/org/apache/cassandra/db/Slices.java
index 8fa9337..bb354a1 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -210,6 +210,13 @@ public abstract class Slices implements Iterable<Slice>
             return this;
         }
 
+        public Builder addAll(Slices slices)
+        {
+            for (Slice slice : slices)
+                add(slice);
+            return this;
+        }
+
         public int size()
         {
             return slices.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 8060f23..11cfb87 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -122,6 +122,30 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
     public abstract UnfilteredPartitionIterator 
filter(UnfilteredPartitionIterator iter, int nowInSec);
 
     /**
+     * Whether the provided row in the provided partition satisfies this 
filter.
+     *
+     * @param metadata the table metadata.
+     * @param partitionKey the partition key for partition to test.
+     * @param row the row to test.
+     * @param nowInSec the current time in seconds (to know what is live and 
what isn't).
+     * @return {@code true} if {@code row} in partition {@code partitionKey} 
satisfies this row filter.
+     */
+    public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey 
partitionKey, Row row, int nowInSec)
+    {
+        // We purge all tombstones as the expressions isSatisfiedBy methods 
expects it
+        Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+        if (purged == null)
+            return expressions.isEmpty();
+
+        for (Expression e : expressions)
+        {
+            if (!e.isSatisfiedBy(metadata, partitionKey, purged))
+                return false;
+        }
+        return true;
+    }
+
+    /**
      * Returns true if all of the expressions within this filter that apply to 
the partition key are satisfied by
      * the given key, false otherwise.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
 
b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
index bfa6690..1f966db 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -65,6 +65,7 @@ public class SingletonUnfilteredPartitionIterator implements 
UnfilteredPartition
 
     public void close()
     {
-        iter.close();
+        if (!returned)
+            iter.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java 
b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 00fc286..7e93c2e 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -74,6 +74,11 @@ public abstract class AbstractCell extends Cell
             column().validateCellPath(path());
     }
 
+    public long maxTimestamp()
+    {
+        return timestamp();
+    }
+
     @Override
     public boolean equals(Object other)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index e8667e0..ea1d9e0 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -322,6 +322,18 @@ public class BTreeRow extends AbstractRow
         return transformAndFilter(newInfo, newDeletion, (cd) -> 
cd.updateAllTimestamp(newTimestamp));
     }
 
+    public Row withRowDeletion(DeletionTime newDeletion)
+    {
+        // Note that:
+        //  - it is a contract with the caller that the new deletion shouldn't 
shadow anything in
+        //    the row, and so in particular it can't shadow the row deletion. 
So if there is a
+        //    already a row deletion we have nothing to do.
+        //  - we set the minLocalDeletionTime to MIN_VALUE because we know the 
deletion is live
+        return newDeletion.isLive() || !deletion.isLive()
+             ? this
+             : new BTreeRow(clustering, primaryKeyLivenessInfo, 
Deletion.regular(newDeletion), btree, Integer.MIN_VALUE);
+    }
+
     public Row purge(DeletionPurger purger, int nowInSec)
     {
         if (!hasDeletion(nowInSec))
@@ -566,6 +578,17 @@ public class BTreeRow extends AbstractRow
                 }
 
                 List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
+                if (deletion != DeletionTime.LIVE)
+                {
+                    // Make sure we don't include any shadowed cells
+                    List<Object> filtered = new ArrayList<>(buildFrom.size());
+                    for (Object c : buildFrom)
+                    {
+                        if (((Cell)c).timestamp() >= 
deletion.markedForDeleteAt())
+                            filtered.add(c);
+                    }
+                    buildFrom = filtered;
+                }
                 Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
                 return new ComplexColumnData(column, btree, deletion);
             }
@@ -621,17 +644,26 @@ public class BTreeRow extends AbstractRow
 
         public void addPrimaryKeyLivenessInfo(LivenessInfo info)
         {
-            this.primaryKeyLivenessInfo = info;
+            // The check is only required for unsorted builders, but it's 
worth the extra safety to have it unconditional
+            if (!deletion.deletes(info))
+                this.primaryKeyLivenessInfo = info;
         }
 
         public void addRowDeletion(Deletion deletion)
         {
             this.deletion = deletion;
+            // The check is only required for unsorted builders, but it's 
worth the extra safety to have it unconditional
+            if (deletion.deletes(primaryKeyLivenessInfo))
+                this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
         }
 
         public void addCell(Cell cell)
         {
             assert cell.column().isStatic() == (clustering == 
Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = 
" + clustering;
+            // In practice, only unsorted builder have to deal with shadowed 
cells, but it doesn't cost us much to deal with it unconditionally in this case
+            if (deletion.deletes(cell))
+                return;
+
             cells.add(cell);
             hasComplex |= cell.column.isComplex();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java 
b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 8912f59..0a2c528 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -133,6 +133,11 @@ public class BufferCell extends AbstractCell
         return path;
     }
 
+    public Cell withUpdatedColumn(ColumnDefinition newColumn)
+    {
+        return new BufferCell(newColumn, timestamp, ttl, localDeletionTime, 
value, path);
+    }
+
     public Cell withUpdatedValue(ByteBuffer newValue)
     {
         return new BufferCell(column, timestamp, ttl, localDeletionTime, 
newValue, path);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 73d9e44..b10ce06 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -129,6 +129,8 @@ public abstract class Cell extends ColumnData
      */
     public abstract CellPath path();
 
+    public abstract Cell withUpdatedColumn(ColumnDefinition newColumn);
+
     public abstract Cell withUpdatedValue(ByteBuffer newValue);
 
     public abstract Cell copy(AbstractAllocator allocator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 84763e5..933da6a 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -82,4 +82,6 @@ public abstract class ColumnData
     public abstract ColumnData markCounterLocalToBeCleared();
 
     public abstract ColumnData purge(DeletionPurger purger, int nowInSec);
+
+    public abstract long maxTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index fab529b..d67d079 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -184,6 +184,14 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell>
         return transformAndFilter(newDeletion, (cell) -> (Cell) 
cell.updateAllTimestamp(newTimestamp));
     }
 
+    public long maxTimestamp()
+    {
+        long timestamp = complexDeletion.markedForDeleteAt();
+        for (Cell cell : this)
+            timestamp = Math.max(timestamp, cell.timestamp());
+        return timestamp;
+    }
+
     // This is the partner in crime of ArrayBackedRow.setValue. The exact 
warning apply. The short
     // version is: "don't use that method".
     void setValue(CellPath path, ByteBuffer value)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index 5f79a66..82c07a7 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -215,6 +215,18 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      */
     public Row updateAllTimestamp(long newTimestamp);
 
+    /**
+     * Returns a copy of this row with the new deletion as row deletion if it 
is more recent
+     * than the current row deletion.
+     * <p>
+     * WARNING: this method <b>does not</b> check that nothing in the row is 
shadowed by the provided
+     * deletion and if that is the case, the created row will be 
<b>invalid</b>. It is thus up to the
+     * caller to verify that this is not the case and the only reasonable use 
case of this is probably
+     * when the row and the deletion comes from the same {@code 
UnfilteredRowIterator} since that gives
+     * use this guarantee.
+     */
+    public Row withRowDeletion(DeletionTime deletion);
+
     public int dataSize();
 
     public long unsharedHeapSizeExcludingData();
@@ -227,12 +239,15 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      * A row deletion mostly consists of the time of said deletion, but there 
is 2 variants: shadowable
      * and regular row deletion.
      * <p>
-     * A shadowable row deletion only exists if the row timestamp ({@code 
primaryKeyLivenessInfo().timestamp()})
-     * is lower than the deletion timestamp. That is, if a row has a 
shadowable deletion with timestamp A and an update is made
-     * to that row with a timestamp B such that B > A, then the shadowable 
deletion is 'shadowed' by that update. A concrete
-     * consequence is that if said update has cells with timestamp lower than 
A, then those cells are preserved
-     * (since the deletion is removed), and this contrarily to a normal 
(regular) deletion where the deletion is preserved
-     * and such cells are removed.
+     * A shadowable row deletion only exists if the row has no timestamp. In 
other words, the deletion is only
+     * valid as long as no newer insert is done (thus setting a row timestap; 
note that if the row timestamp set
+     * is lower than the deletion, it is shadowed (and thus ignored) as usual).
+     * <p>
+     * That is, if a row has a shadowable deletion with timestamp A and an 
update is madeto that row with a
+     * timestamp B such that B > A (and that update sets the row timestamp), 
then the shadowable deletion is 'shadowed'
+     * by that update. A concrete consequence is that if said update has cells 
with timestamp lower than A, then those
+     * cells are preserved(since the deletion is removed), and this contrarily 
to a normal (regular) deletion where the
+     * deletion is preserved and such cells are removed.
      * <p>
      * Currently, the only use of shadowable row deletions is Materialized 
Views, see CASSANDRA-10261.
      */
@@ -312,6 +327,11 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
             return time.deletes(info);
         }
 
+        public boolean deletes(Cell cell)
+        {
+            return time.deletes(cell);
+        }
+
         public void digest(MessageDigest digest)
         {
             time.digest(digest);
@@ -361,6 +381,9 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
      *      any column before {@code c} and before any call for any column 
after {@code c}.
      *   5) Calls to {@link #addCell} are further done in strictly increasing 
cell order (the one defined by
      *      {@link Cell#comparator}. That is, for a give column, cells are 
passed in {@code CellPath} order.
+     *   6) No shadowed data should be added. Concretely, this means that if a 
a row deletion is added, it doesn't
+     *      deletes the row timestamp or any cell added later, and similarly 
no cell added is deleted by the complex
+     *      deletion of the column this is a cell of.
      *
      * An unsorted builder will not expect those last rules however: {@link 
#addCell} and {@link #addComplexDeletion}
      * can be done in any order. And in particular unsorted builder allows 
multiple calls for the same column/cell. In

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java 
b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
index f209bfc..ec848a0 100644
--- a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
+++ b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
@@ -71,7 +71,7 @@ public interface RowDiffListener
      * @param i the input row from which {@code original} is from.
      * @param clustering the clustering for the row that is merged.
      * @param merged the cell of the merged row. Will be {@code null} if input 
{@code i} had a cell but that cell is no present
-     * in the mergd result (it has been deleted/shadowed).
+     * in the merged result (it has been deleted/shadowed).
      * @param original the cell of input {@code i}. May be {@code null} if 
input {@code i} had cell corresponding to {@code merged}.
      */
     public void onCell(int i, Clustering clustering, Cell merged, Cell 
original);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 9416896..ce5fffe 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -92,7 +92,7 @@ public abstract class UnfilteredRowIterators
     }
 
     /**
-     * Returns an empty atom iterator for a given partition.
+     * Returns an empty unfiltered iterator for a given partition.
      */
     public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, 
final DecoratedKey partitionKey, final Row staticRow, final DeletionTime 
partitionDeletion, final boolean isReverseOrder)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
new file mode 100644
index 0000000..893bdd5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+
+/**
+ * Groups all the views for a given table.
+ */
+public class TableViews extends AbstractCollection<View>
+{
+    private final CFMetaData baseTableMetadata;
+
+    // We need this to be thread-safe, but the number of times this is changed 
(when a view is created in the keyspace)
+    // massively exceeds the number of time it's read (for every mutation on 
the keyspace), so a copy-on-write list is the best option.
+    private final List<View> views = new CopyOnWriteArrayList();
+
+    public TableViews(CFMetaData baseTableMetadata)
+    {
+        this.baseTableMetadata = baseTableMetadata;
+    }
+
+    public int size()
+    {
+        return views.size();
+    }
+
+    public Iterator<View> iterator()
+    {
+        return views.iterator();
+    }
+
+    public boolean contains(String viewName)
+    {
+        return Iterables.any(views, view -> view.name.equals(viewName));
+    }
+
+    public boolean add(View view)
+    {
+        // We should have validated that there is no existing view with this 
name at this point
+        assert !contains(view.name);
+        return views.add(view);
+    }
+
+    public Iterable<ColumnFamilyStore> allViewsCfs()
+    {
+        Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName);
+        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().viewName));
+    }
+
+    public void forceBlockingFlush()
+    {
+        for (ColumnFamilyStore viewCfs : allViewsCfs())
+            viewCfs.forceBlockingFlush();
+    }
+
+    public void dumpMemtables()
+    {
+        for (ColumnFamilyStore viewCfs : allViewsCfs())
+            viewCfs.dumpMemtable();
+    }
+
+    public void truncateBlocking(long truncatedAt)
+    {
+        for (ColumnFamilyStore viewCfs : allViewsCfs())
+        {
+            ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
+            SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, 
replayAfter);
+        }
+    }
+
+    public void removeByName(String viewName)
+    {
+        views.removeIf(v -> v.name.equals(viewName));
+    }
+
+    /**
+     * Calculates and pushes updates to the views replicas. The replicas are 
determined by
+     * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+     *
+     * @param update an update on the base table represented by this object.
+     * @param writeCommitLog whether we should write the commit log for the 
view updates.
+     * @param baseComplete time from epoch in ms that the local base mutation 
was (or will be) completed
+     */
+    public void pushViewReplicaUpdates(PartitionUpdate update, boolean 
writeCommitLog, AtomicLong baseComplete)
+    {
+        assert update.metadata().cfId.equals(baseTableMetadata.cfId);
+
+        Collection<View> views = updatedViews(update);
+        if (views.isEmpty())
+            return;
+
+        // Read modified rows
+        int nowInSec = FBUtilities.nowInSeconds();
+        SinglePartitionReadCommand command = readExistingRowsCommand(update, 
views, nowInSec);
+        if (command == null)
+            return;
+
+        ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata());
+        long start = System.nanoTime();
+        Collection<Mutation> mutations;
+        try (ReadOrderGroup orderGroup = command.startOrderGroup();
+             UnfilteredRowIterator existings = 
UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), 
command);
+             UnfilteredRowIterator updates = update.unfilteredIterator())
+        {
+            mutations = generateViewUpdates(views, updates, existings, 
nowInSec);
+        }
+        
Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime()
 - start, TimeUnit.NANOSECONDS);
+
+        if (!mutations.isEmpty())
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, 
writeCommitLog, baseComplete);
+    }
+
+    /**
+     * Given some updates on the base table of this object and the existing 
values for the rows affected by that update, generates the
+     * mutation to be applied to the provided views.
+     *
+     * @param views the views potentially affected by {@code updates}.
+     * @param updates the base table updates being applied.
+     * @param existings the existing values for the rows affected by {@code 
updates}. This is used to decide if a view is
+     * obsoleted by the update and should be removed, gather the values for 
columns that may not be part of the update if
+     * a new view entry needs to be created, and compute the minimal updates 
to be applied if the view entry isn't changed
+     * but has simply some updated values. This will be empty for view 
building as we want to assume anything we'll pass
+     * to {@code updates} is new.
+     * @param nowInSec the current time in seconds.
+     * @return the mutations to apply to the {@code views}. This can be empty.
+     */
+    public Collection<Mutation> generateViewUpdates(Collection<View> views, 
UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec)
+    {
+        assert updates.metadata().cfId.equals(baseTableMetadata.cfId);
+
+        List<ViewUpdateGenerator> generators = new ArrayList<>(views.size());
+        for (View view : views)
+            generators.add(new ViewUpdateGenerator(view, 
updates.partitionKey(), nowInSec));
+
+        DeletionTracker existingsDeletion = new 
DeletionTracker(existings.partitionLevelDeletion());
+        DeletionTracker updatesDeletion = new 
DeletionTracker(updates.partitionLevelDeletion());
+
+        /*
+         * We iterate through the updates and the existing rows in parallel. 
This allows us to know the consequence
+         * on the view of each update.
+         */
+        PeekingIterator<Unfiltered> existingsIter = 
Iterators.peekingIterator(existings);
+        PeekingIterator<Unfiltered> updatesIter = 
Iterators.peekingIterator(updates);
+
+        while (existingsIter.hasNext() && updatesIter.hasNext())
+        {
+            Unfiltered existing = existingsIter.peek();
+            Unfiltered update = updatesIter.peek();
+
+            Row existingRow;
+            Row updateRow;
+            int cmp = baseTableMetadata.comparator.compare(update, existing);
+            if (cmp < 0)
+            {
+                // We have an update where there was nothing before
+                if (update.isRangeTombstoneMarker())
+                {
+                    updatesDeletion.update(updatesIter.next());
+                    continue;
+                }
+
+                updateRow = 
((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
+                existingRow = emptyRow(updateRow.clustering(), 
existingsDeletion.currentDeletion());
+            }
+            else if (cmp > 0)
+            {
+                // We have something existing but no update (which will happen 
either because it's a range tombstone marker in
+                // existing, or because we've fetched the existing row due to 
some partition/range deletion in the updates)
+                if (existing.isRangeTombstoneMarker())
+                {
+                    existingsDeletion.update(existingsIter.next());
+                    continue;
+                }
+
+                existingRow = 
((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
+                updateRow = emptyRow(existingRow.clustering(), 
updatesDeletion.currentDeletion());
+
+                // The way we build the read command used for existing rows, 
we should always have updatesDeletion.currentDeletion()
+                // that is not live, since we wouldn't have read the existing 
row otherwise. And we could assert that, but if we ever
+                // change the read method so that it can slightly over-read in 
some case, that would be an easily avoiding bug lurking,
+                // so we just handle the case.
+                if (updateRow == null)
+                    continue;
+            }
+            else
+            {
+                // We're updating a row that had pre-existing data
+                if (update.isRangeTombstoneMarker())
+                {
+                    assert existing.isRangeTombstoneMarker();
+                    updatesDeletion.update(updatesIter.next());
+                    existingsDeletion.update(existingsIter.next());
+                    continue;
+                }
+
+                assert !existing.isRangeTombstoneMarker();
+                existingRow = 
((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
+                updateRow = 
((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
+            }
+
+            addToViewUpdateGenerators(existingRow, updateRow, generators, 
nowInSec);
+        }
+
+        // We only care about more existing rows if the update deletion isn't 
live, i.e. if we had a partition deletion
+        if (!updatesDeletion.currentDeletion().isLive())
+        {
+            while (existingsIter.hasNext())
+            {
+                Unfiltered existing = existingsIter.next();
+                // If it's a range tombstone, we don't care, we're only 
looking for existing entry that gets deleted by
+                // the new partition deletion
+                if (existing.isRangeTombstoneMarker())
+                    continue;
+
+                Row existingRow = (Row)existing;
+                addToViewUpdateGenerators(existingRow, 
emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), 
generators, nowInSec);
+            }
+        }
+        while (updatesIter.hasNext())
+        {
+            Unfiltered update = updatesIter.next();
+            // If it's a range tombstone, it removes nothing pre-exisiting, so 
we can ignore it for view updates
+            if (update.isRangeTombstoneMarker())
+                continue;
+
+            Row updateRow = (Row)update;
+            addToViewUpdateGenerators(emptyRow(updateRow.clustering(), 
DeletionTime.LIVE), updateRow, generators, nowInSec);
+        }
+
+        return buildMutations(baseTableMetadata, generators);
+    }
+
+    /**
+     * Return the views that are potentially updated by the provided updates.
+     *
+     * @param updates the updates applied to the base table.
+     * @return the views affected by {@code updates}.
+     */
+    public Collection<View> updatedViews(PartitionUpdate updates)
+    {
+        List<View> matchingViews = new ArrayList<>(views.size());
+
+        for (View view : views)
+        {
+            ReadQuery selectQuery = view.getReadQuery();
+            if (!selectQuery.selectsKey(updates.partitionKey()))
+                continue;
+
+            matchingViews.add(view);
+        }
+        return matchingViews;
+    }
+
+    /**
+     * Returns the command to use to read the existing rows required to 
generate view updates for the provided base
+     * base updates.
+     *
+     * @param updates the base table updates being applied.
+     * @param views the views potentially affected by {@code updates}.
+     * @param nowInSec the current time in seconds.
+     * @return the command to use to read the base table rows required to 
generate view updates for {@code updates}.
+     */
+    private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate 
updates, Collection<View> views, int nowInSec)
+    {
+        Slices.Builder sliceBuilder = null;
+        DeletionInfo deletionInfo = updates.deletionInfo();
+        CFMetaData metadata = updates.metadata();
+        DecoratedKey key = updates.partitionKey();
+        // TODO: This is subtle: we need to gather all the slices that we have 
to fetch between partition del, range tombstones and rows.
+        if (!deletionInfo.isLive())
+        {
+            sliceBuilder = new Slices.Builder(metadata.comparator);
+            // Everything covered by a deletion might invalidate an existing 
view entry, which means we must read it to know. In practice
+            // though, the views involved might filter some base table 
clustering columns, in which case we can restrict what we read
+            // using those restrictions.
+            // If there is a partition deletion, then we can simply take each 
slices from each view select filter. They may overlap but
+            // the Slices.Builder handles that for us. Note that in many case 
this will just involve reading everything (as soon as any
+            // view involved has no clustering restrictions for instance).
+            // For range tombstone, we should theoretically take the 
difference between the range tombstoned and the slices selected
+            // by every views, but as we don't an easy way to compute that 
right now, we keep it simple and just use the tombstoned
+            // range.
+            // TODO: we should improve that latter part.
+            if (!deletionInfo.getPartitionDeletion().isLive())
+            {
+                for (View view : views)
+                    
sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices());
+            }
+            else
+            {
+                assert deletionInfo.hasRanges();
+                Iterator<RangeTombstone> iter = 
deletionInfo.rangeIterator(false);
+                while (iter.hasNext())
+                    sliceBuilder.add(iter.next().deletedSlice());
+            }
+        }
+
+        // We need to read every row that is updated, unless we can prove that 
it has no impact on any view entries.
+
+        // If we had some slices from the deletions above, we'll continue 
using that. Otherwise, it's more efficient to build
+        // a names query.
+        BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? 
BTreeSet.builder(metadata.comparator) : null;
+        for (Row row : updates)
+        {
+            // Don't read the existing state if we can prove the update won't 
affect any views
+            if (!affectsAnyViews(key, row, views))
+                continue;
+
+            if (namesBuilder == null)
+                sliceBuilder.add(Slice.make(row.clustering()));
+            else
+                namesBuilder.add(row.clustering());
+        }
+
+        NavigableSet<Clustering> names = namesBuilder == null ? null : 
namesBuilder.build();
+        // If we have a slice builder, it means we had some deletions and we 
have to read. But if we had
+        // only row updates, it's possible none of them affected the views, in 
which case we have nothing
+        // to do.
+        if (names != null && names.isEmpty())
+            return null;
+
+        ClusteringIndexFilter clusteringFilter = names == null
+                                               ? new 
ClusteringIndexSliceFilter(sliceBuilder.build(), false)
+                                               : new 
ClusteringIndexNamesFilter(names, false);
+        // If we have more than one view, we should merge the queried columns 
by each views but to keep it simple we just
+        // include everything. We could change that in the future.
+        ColumnFilter queriedColumns = views.size() == 1
+                                    ? 
Iterables.getOnlyElement(views).getSelectStatement().queriedColumns()
+                                    : ColumnFilter.all(metadata);
+        // Note that the views could have restrictions on regular columns, but 
even if that's the case we shouldn't apply those
+        // when we read, because even if an existing row doesn't match the 
view filter, the update can change that in which
+        // case we'll need to know the existing content. There is also no easy 
way to merge those RowFilter when we have multiple views.
+        // TODO: we could still make sense to special case for when there is a 
single view and a small number of updates (and
+        // no deletions). Indeed, in that case we could check whether any of 
the update modify any of the restricted regular
+        // column, and if that's not the case we could use view filter. We 
keep it simple for now though.
+        RowFilter rowFilter = RowFilter.NONE;
+        return SinglePartitionReadCommand.create(metadata, nowInSec, 
queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter);
+    }
+
+    private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, 
Collection<View> views)
+    {
+        for (View view : views)
+        {
+            if (view.mayBeAffectedBy(partitionKey, update))
+                return true;
+        }
+        return false;
+    }
+
+    /**
+     * Given an existing base row and the update that we're going to apply to 
this row, generate the modifications
+     * to apply to MVs using the provided {@code ViewUpdateGenerator}s.
+     *
+     * @param existingBaseRow the base table row as it is before an update.
+     * @param updateBaseRow the newly updates made to {@code existingBaseRow}.
+     * @param generators the view update generators to add the new changes to.
+     * @param nowInSec the current time in seconds. Used to decide if data is 
live or not.
+     */
+    private static void addToViewUpdateGenerators(Row existingBaseRow, Row 
updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec)
+    {
+        // Having existing empty is useful, it just means we'll insert a brand 
new entry for updateBaseRow,
+        // but if we have no update at all, we shouldn't get there.
+        assert !updateBaseRow.isEmpty();
+
+        // We allow existingBaseRow to be null, which we treat the same as 
being empty as an small optimization
+        // to avoid allocating empty row objects when we know there was 
nothing existing.
+        Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : 
Rows.merge(existingBaseRow, updateBaseRow, nowInSec);
+        for (ViewUpdateGenerator generator : generators)
+            generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow);
+    }
+
+    private static Row emptyRow(Clustering clustering, DeletionTime deletion)
+    {
+        // Returning null for an empty row is slightly ugly, but the case 
where there is no pre-existing row is fairly common
+        // (especially when building the view), so we want to avoid a dummy 
allocation of an empty row every time.
+        // And MultiViewUpdateBuilder knows how to deal with that.
+        return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, 
Row.Deletion.regular(deletion));
+    }
+
+    /**
+     * Extracts (and potentially groups) the mutations generated by the 
provided view update generator.
+     * Returns the mutation that needs to be done to the views given the base 
table updates
+     * passed to {@link #addBaseTableUpdate}.
+     *
+     * @param baseTableMetadata the metadata for the base table being updated.
+     * @param generators the generators from which to extract the view 
mutations from.
+     * @return the mutations created by all the generators in {@code 
generators}.
+     */
+    private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, 
List<ViewUpdateGenerator> generators)
+    {
+        // One view is probably common enough and we can optimize a bit easily
+        if (generators.size() == 1)
+        {
+            Collection<PartitionUpdate> updates = 
generators.get(0).generateViewUpdates();
+            List<Mutation> mutations = new ArrayList<>(updates.size());
+            for (PartitionUpdate update : updates)
+                mutations.add(new Mutation(update));
+            return mutations;
+        }
+
+        Map<DecoratedKey, Mutation> mutations = new HashMap<>();
+        for (ViewUpdateGenerator generator : generators)
+        {
+            for (PartitionUpdate update : generator.generateViewUpdates())
+            {
+                DecoratedKey key = update.partitionKey();
+                Mutation mutation = mutations.get(key);
+                if (mutation == null)
+                {
+                    mutation = new Mutation(baseTableMetadata.ksName, key);
+                    mutations.put(key, mutation);
+                }
+                mutation.add(update);
+            }
+        }
+        return mutations.values();
+    }
+
+    /**
+     * A simple helper that tracks for a given {@code UnfilteredRowIterator} 
what is the current deletion at any time of the
+     * iteration. It will be the currently open range tombstone deletion if 
there is one and the partition deletion otherwise.
+     */
+    private static class DeletionTracker
+    {
+        private final DeletionTime partitionDeletion;
+        private DeletionTime deletion;
+
+        public DeletionTracker(DeletionTime partitionDeletion)
+        {
+            this.partitionDeletion = partitionDeletion;
+        }
+
+        public void update(Unfiltered marker)
+        {
+            assert marker instanceof RangeTombstoneMarker;
+            RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker;
+            this.deletion = rtm.isOpen(false)
+                          ? rtm.openDeletionTime(false)
+                          : null;
+        }
+
+        public DeletionTime currentDeletion()
+        {
+            return deletion == null ? partitionDeletion : deletion;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java 
b/src/java/org/apache/cassandra/db/view/TemporalRow.java
deleted file mode 100644
index 23705b9..0000000
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ /dev/null
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import static java.util.Comparator.naturalOrder;
-import static java.util.Comparator.reverseOrder;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Conflicts;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.partitions.AbstractBTreePartition;
-import org.apache.cassandra.db.rows.BufferCell;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * Represents a single CQL Row in a base table, with both the currently 
persisted value and the update's value. The
- * values are stored in timestamp order, but also indicate whether they are 
from the currently persisted, allowing a
- * {@link TemporalRow.Resolver} to resolve if the value is an old value that 
has been updated; if it sorts after the
- * update's value, then it does not qualify.
- */
-public class TemporalRow
-{
-    private static final int NO_TTL = LivenessInfo.NO_TTL;
-    private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP;
-    private static final int NO_DELETION_TIME = 
DeletionTime.LIVE.localDeletionTime();
-
-    public interface Resolver
-    {
-        /**
-         * @param cellVersions  all cells for a certain TemporalRow's Cell
-         * @return      A single TemporalCell from the iterable which 
satisfies the resolution criteria, or null if
-         *              there is no cell which qualifies
-         */
-        TemporalCell resolve(TemporalCell.Versions cellVersions);
-    }
-
-    public static final Resolver oldValueIfUpdated = 
TemporalCell.Versions::getOldCellIfUpdated;
-    public static final Resolver earliest = 
TemporalCell.Versions::getEarliestCell;
-    public static final Resolver latest = TemporalCell.Versions::getLatestCell;
-
-    private static class TemporalCell
-    {
-        public final ByteBuffer value;
-        public final long timestamp;
-        public final int ttl;
-        public final int localDeletionTime;
-        public final boolean isNew;
-
-        private TemporalCell(ByteBuffer value, long timestamp, int ttl, int 
localDeletionTime, boolean isNew)
-        {
-            this.value = value;
-            this.timestamp = timestamp;
-            this.ttl = ttl;
-            this.localDeletionTime = localDeletionTime;
-            this.isNew = isNew;
-        }
-
-        @Override
-        public String toString()
-        {
-            return MoreObjects.toStringHelper(this)
-                    .add("value", value == null ? "null" : 
ByteBufferUtil.bytesToHex(value))
-                    .add("timestamp", timestamp)
-                    .add("ttl", ttl)
-                    .add("localDeletionTime", localDeletionTime)
-                    .add("isNew", isNew)
-                    .toString();
-        }
-
-        public TemporalCell reconcile(TemporalCell that)
-        {
-            int now = FBUtilities.nowInSeconds();
-            Conflicts.Resolution resolution = 
Conflicts.resolveRegular(that.timestamp,
-                                                                       
that.isLive(now),
-                                                                       
that.localDeletionTime,
-                                                                       
that.value,
-                                                                       
this.timestamp,
-                                                                       
this.isLive(now),
-                                                                       
this.localDeletionTime,
-                                                                       
this.value);
-            assert resolution != Conflicts.Resolution.MERGE;
-            if (resolution == Conflicts.Resolution.LEFT_WINS)
-                return that;
-            return this;
-        }
-
-        private boolean isLive(int now)
-        {
-            return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && 
now < localDeletionTime);
-        }
-
-        public Cell cell(ColumnDefinition definition, CellPath cellPath)
-        {
-            return new BufferCell(definition, timestamp, ttl, 
localDeletionTime, value, cellPath);
-        }
-
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            TemporalCell that = (TemporalCell) o;
-
-            if (timestamp != that.timestamp) return false;
-            if (ttl != that.ttl) return false;
-            if (localDeletionTime != that.localDeletionTime) return false;
-            if (isNew != that.isNew) return false;
-            return !(value != null ? !value.equals(that.value) : that.value != 
null);
-        }
-
-        public int hashCode()
-        {
-            int result = value != null ? value.hashCode() : 0;
-            result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-            result = 31 * result + ttl;
-            result = 31 * result + localDeletionTime;
-            result = 31 * result + (isNew ? 1 : 0);
-            return result;
-        }
-
-        /**
-         * Tracks the versions of a cell for a given TemporalRow.
-         * There are only two possible versions, existing and new.
-         *
-         */
-        static class Versions
-        {
-            private TemporalCell existingCell = null;
-            private TemporalCell newCell = null;
-            private int numSet = 0;
-
-
-            /**
-             * @return the cell that is earliest
-             * Or would be overwritten in the case of a timestamp conflict
-             */
-            public TemporalCell getEarliestCell()
-            {
-                assert numSet > 0;
-
-                if (numSet == 1)
-                    return existingCell == null ? newCell : existingCell;
-
-                TemporalCell latest = existingCell.reconcile(newCell);
-
-                return latest == newCell ? existingCell : newCell;
-            }
-
-            /**
-             * @return the cell that is latest
-             * Or would be the winner in the case of a timestamp conflict
-             */
-            public TemporalCell getLatestCell()
-            {
-                assert numSet > 0;
-
-                if (numSet == 1)
-                    return existingCell == null ? newCell : existingCell;
-
-                return existingCell.reconcile(newCell);
-            }
-
-            /**
-             * @return the new cell if it updates the existing cell
-             */
-            public TemporalCell getOldCellIfUpdated()
-            {
-                assert numSet > 0;
-
-                if (numSet == 1)
-                   return null;
-
-                TemporalCell value = existingCell.reconcile(newCell);
-
-                return ByteBufferUtil.compareUnsigned(existingCell.value, 
value.value) != 0 ? existingCell : null;
-            }
-
-            void setVersion(TemporalCell cell)
-            {
-                assert cell != null;
-
-                if (cell.isNew)
-                {
-                    assert newCell == null || newCell.equals(cell) : "Only one 
cell version can be marked New; newCell: " + newCell + ", cell: " + cell;
-                    newCell = cell;
-                    numSet = existingCell == null ? 1 : 2;
-                }
-                else
-                {
-                    assert existingCell == null || existingCell.equals(cell) : 
"Only one cell version can be marked Existing; existingCell: " + existingCell + 
", cell: " + cell;
-                    existingCell = cell;
-                    numSet = newCell == null ? 1 : 2;
-                }
-            }
-
-            public void addToRow(TemporalRow row, ColumnIdentifier column, 
CellPath path)
-            {
-                if (existingCell != null)
-                    row.addColumnValue(column, path, existingCell.timestamp, 
existingCell.ttl,
-                                       existingCell.localDeletionTime, 
existingCell.value, existingCell.isNew);
-
-                if (newCell != null)
-                    row.addColumnValue(column, path, newCell.timestamp, 
newCell.ttl,
-                                       newCell.localDeletionTime, 
newCell.value, newCell.isNew);
-            }
-
-            @Override
-            public String toString()
-            {
-                return MoreObjects.toStringHelper(this)
-                        .add("numSet", numSet)
-                        .add("existingCell", existingCell)
-                        .add("newCell", newCell)
-                        .toString();
-            }
-        }
-    }
-
-    private final ColumnFamilyStore baseCfs;
-    private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
-    private final ByteBuffer basePartitionKey;
-    private final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
-    private final Row startRow;
-    private final boolean startIsNew;
-
-    public final int nowInSec;
-    private final Map<ColumnIdentifier, Map<CellPath, TemporalCell.Versions>> 
columnValues = new HashMap<>();
-    private int viewClusteringTtl = NO_TTL;
-    private long viewClusteringTimestamp = NO_TIMESTAMP;
-    private int viewClusteringLocalDeletionTime = NO_DELETION_TIME;
-
-    TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> 
viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew)
-    {
-        this.baseCfs = baseCfs;
-        this.viewPrimaryKey = viewPrimaryKey;
-        this.basePartitionKey = key;
-        this.startRow = row;
-        this.startIsNew = isNew;
-        this.nowInSec = nowInSec;
-
-        LivenessInfo liveness = row.primaryKeyLivenessInfo();
-        updateLiveness(liveness.ttl(), liveness.timestamp(), 
row.deletion().time().localDeletionTime());
-
-        List<ColumnDefinition> clusteringDefs = 
baseCfs.metadata.clusteringColumns();
-        clusteringColumns = new HashMap<>();
-
-        for (int i = 0; i < clusteringDefs.size(); i++)
-        {
-            ColumnDefinition cdef = clusteringDefs.get(i);
-            clusteringColumns.put(cdef.name, row.clustering().get(i));
-
-            addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, 
NO_DELETION_TIME, row.clustering().get(i), isNew);
-        }
-    }
-
-    /*
-     * PK ts:5, ttl:1, deletion: 2
-     * Col ts:4, ttl:2, deletion: 3
-     *
-     * TTL use min, since it expires at the lowest time which we are expiring. 
If we have the above values, we
-     * would want to return 1, since the base row expires in 1 second.
-     *
-     * Timestamp uses max, as this is the time that the row has been written 
to the view. See CASSANDRA-10910.
-     *
-     * Local Deletion Time should use max, as this deletion will cover all 
previous values written.
-     */
-    private void updateLiveness(int ttl, long timestamp, int localDeletionTime)
-    {
-        // We are returning whichever is higher from valueIfSet
-        // Natural order will return the max: 1.compareTo(2) < 0, so 2 is 
returned
-        // Reverse order will return the min: 1.compareTo(2) > 0, so 1 is 
returned
-        this.viewClusteringTtl = valueIfSet(viewClusteringTtl, ttl, NO_TTL, 
reverseOrder());
-        this.viewClusteringTimestamp = valueIfSet(viewClusteringTimestamp, 
timestamp, NO_TIMESTAMP, naturalOrder());
-        this.viewClusteringLocalDeletionTime = 
valueIfSet(viewClusteringLocalDeletionTime, localDeletionTime, 
NO_DELETION_TIME, naturalOrder());
-    }
-
-    @Override
-    public String toString()
-    {
-        return MoreObjects.toStringHelper(this)
-                .add("table", baseCfs.keyspace.getName() + "." + 
baseCfs.getTableName())
-                .add("basePartitionKey", 
ByteBufferUtil.bytesToHex(basePartitionKey))
-                .add("startRow", startRow.toString(baseCfs.metadata))
-                .add("startIsNew", startIsNew)
-                .add("nowInSec", nowInSec)
-                .add("viewClusteringTtl", viewClusteringTtl)
-                .add("viewClusteringTimestamp", viewClusteringTimestamp)
-                .add("viewClusteringLocalDeletionTime", 
viewClusteringLocalDeletionTime)
-                .add("columnValues", columnValues)
-                .toString();
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        TemporalRow that = (TemporalRow) o;
-
-        if (!clusteringColumns.equals(that.clusteringColumns)) return false;
-        if (!basePartitionKey.equals(that.basePartitionKey)) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = basePartitionKey.hashCode();
-        result = 31 * result + clusteringColumns.hashCode();
-        return result;
-    }
-
-    public void addColumnValue(ColumnIdentifier identifier,
-                               CellPath cellPath,
-                               long timestamp,
-                               int ttl,
-                               int localDeletionTime,
-                               ByteBuffer value,  boolean isNew)
-    {
-        if (!columnValues.containsKey(identifier))
-            columnValues.put(identifier, new HashMap<>());
-
-        Map<CellPath, TemporalCell.Versions> innerMap = 
columnValues.get(identifier);
-
-        if (!innerMap.containsKey(cellPath))
-            innerMap.put(cellPath, new TemporalCell.Versions());
-
-        // If this column is part of the view's primary keys
-        if (viewPrimaryKey.contains(identifier))
-        {
-            updateLiveness(ttl, timestamp, localDeletionTime);
-        }
-
-        innerMap.get(cellPath).setVersion(new TemporalCell(value, timestamp, 
ttl, localDeletionTime, isNew));
-    }
-
-    /**
-     * @return
-     * <ul>
-     *     <li>
-     *         If both existing and update are defaultValue, return 
defaultValue
-     *     </li>
-     *     <li>
-     *         If only one of existing or existing are defaultValue, return 
the one which is not
-     *     </li>
-     *     <li>
-     *         If both existing and update are not defaultValue, compare using 
comparator and return the higher one.
-     *     </li>
-     * </ul>
-     */
-    private static <T> T valueIfSet(T existing, T update, T defaultValue, 
Comparator<T> comparator)
-    {
-        if (existing.equals(defaultValue))
-            return update;
-        if (update.equals(defaultValue))
-            return existing;
-        return comparator.compare(existing, update) > 0 ? existing : update;
-    }
-
-    public int viewClusteringTtl()
-    {
-        return viewClusteringTtl;
-    }
-
-    public long viewClusteringTimestamp()
-    {
-        return viewClusteringTimestamp;
-    }
-
-    public int viewClusteringLocalDeletionTime()
-    {
-        return viewClusteringLocalDeletionTime;
-    }
-
-    public void addCell(Cell cell, boolean isNew)
-    {
-        addColumnValue(cell.column().name, cell.path(), cell.timestamp(), 
cell.ttl(), cell.localDeletionTime(), cell.value(), isNew);
-    }
-
-    // The Definition here is actually the *base table* definition
-    public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver 
resolver)
-    {
-        ColumnDefinition baseDefinition = 
definition.cfName.equals(baseCfs.name)
-                                          ? definition
-                                          : 
baseCfs.metadata.getColumnDefinition(definition.name);
-
-        if (baseDefinition.isPartitionKey())
-        {
-            if (baseCfs.metadata.getKeyValidator() instanceof CompositeType)
-            {
-                CompositeType keyComparator = (CompositeType) 
baseCfs.metadata.getKeyValidator();
-                ByteBuffer[] components = 
keyComparator.split(basePartitionKey);
-                return components[baseDefinition.position()];
-            }
-            else
-            {
-                return basePartitionKey;
-            }
-        }
-        else
-        {
-            ColumnIdentifier columnIdentifier = baseDefinition.name;
-
-            if (clusteringColumns.containsKey(columnIdentifier))
-                return clusteringColumns.get(columnIdentifier);
-
-            Collection<org.apache.cassandra.db.rows.Cell> val = 
values(definition, resolver);
-            if (val != null && val.size() == 1)
-            {
-                org.apache.cassandra.db.rows.Cell cell = 
Iterables.getOnlyElement(val);
-                // handle single-column deletions correctly
-                return cell.isTombstone() ? null : cell.value();
-            }
-        }
-        return null;
-    }
-
-    public DeletionTime deletionTime(AbstractBTreePartition partition)
-    {
-        DeletionInfo deletionInfo = partition.deletionInfo();
-        if (!deletionInfo.getPartitionDeletion().isLive())
-            return deletionInfo.getPartitionDeletion();
-
-        Clustering baseClustering = baseClusteringBuilder().build();
-        RangeTombstone clusterTombstone = 
deletionInfo.rangeCovering(baseClustering);
-        if (clusterTombstone != null)
-            return clusterTombstone.deletionTime();
-
-        Row row = partition.getRow(baseClustering);
-        return row == null || row.deletion().isLive() ? DeletionTime.LIVE : 
row.deletion().time();
-    }
-
-    public Collection<org.apache.cassandra.db.rows.Cell> 
values(ColumnDefinition definition, Resolver resolver)
-    {
-        Map<CellPath, TemporalCell.Versions> innerMap = 
columnValues.get(definition.name);
-        if (innerMap == null)
-        {
-            return Collections.emptyList();
-        }
-
-        Collection<org.apache.cassandra.db.rows.Cell> value = new 
ArrayList<>();
-        for (Map.Entry<CellPath, TemporalCell.Versions> pathAndCells : 
innerMap.entrySet())
-        {
-            TemporalCell cell = resolver.resolve(pathAndCells.getValue());
-
-            if (cell != null)
-                value.add(cell.cell(definition, pathAndCells.getKey()));
-        }
-        return value;
-    }
-
-    public Slice baseSlice()
-    {
-        return baseClusteringBuilder().buildSlice();
-    }
-
-    private CBuilder baseClusteringBuilder()
-    {
-        CFMetaData metadata = baseCfs.metadata;
-        CBuilder builder = CBuilder.create(metadata.comparator);
-
-        ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()];
-        for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : 
clusteringColumns.entrySet())
-            buffers[metadata.getColumnDefinition(buffer.getKey()).position()] 
= buffer.getValue();
-
-        for (ByteBuffer byteBuffer : buffers)
-            builder = builder.add(byteBuffer);
-
-        return builder;
-    }
-
-    public Clustering baseClustering()
-    {
-        return startRow.clustering();
-    }
-
-    static class Set implements Iterable<TemporalRow>
-    {
-        private final ColumnFamilyStore baseCfs;
-        private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
-        private final ByteBuffer key;
-        public final DecoratedKey dk;
-        private final Map<Clustering, TemporalRow> clusteringToRow;
-        final int nowInSec = FBUtilities.nowInSeconds();
-        private boolean hasTombstonedExisting = false;
-
-        Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> 
viewPrimaryKey, ByteBuffer key)
-        {
-            this.baseCfs = baseCfs;
-            this.viewPrimaryKey = viewPrimaryKey;
-            this.key = key;
-            this.dk = baseCfs.decorateKey(key);
-            this.clusteringToRow = new HashMap<>();
-        }
-
-        public Iterator<TemporalRow> iterator()
-        {
-            return clusteringToRow.values().iterator();
-        }
-
-        public TemporalRow getClustering(Clustering clustering)
-        {
-            return clusteringToRow.get(clustering);
-        }
-
-        public void addRow(Row row, boolean isNew)
-        {
-            TemporalRow temporalRow = clusteringToRow.get(row.clustering());
-            if (temporalRow == null)
-            {
-                temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, 
row, nowInSec, isNew);
-                clusteringToRow.put(row.clustering(), temporalRow);
-            }
-
-            for (Cell cell : row.cells())
-            {
-                temporalRow.addCell(cell, isNew);
-            }
-        }
-
-        private void addRow(TemporalRow row)
-        {
-            TemporalRow newRow = new TemporalRow(baseCfs, viewPrimaryKey, key, 
row.startRow, nowInSec, row.startIsNew);
-
-            TemporalRow existing = 
clusteringToRow.put(row.startRow.clustering(), newRow);
-            assert existing == null;
-
-            for (Map.Entry<ColumnIdentifier, Map<CellPath, 
TemporalCell.Versions>> entry : row.columnValues.entrySet())
-            {
-                for (Map.Entry<CellPath, TemporalCell.Versions> cellPathEntry 
: entry.getValue().entrySet())
-                {
-                    TemporalCell.Versions cellVersions = 
cellPathEntry.getValue();
-
-                    cellVersions.addToRow(newRow, entry.getKey(), 
cellPathEntry.getKey());
-                }
-            }
-        }
-
-        public TemporalRow.Set 
withNewViewPrimaryKey(java.util.Set<ColumnIdentifier> viewPrimaryKey)
-        {
-            TemporalRow.Set newSet = new Set(baseCfs, viewPrimaryKey, key);
-
-            for (TemporalRow row : this)
-                newSet.addRow(row);
-
-            return newSet;
-        }
-
-        public boolean hasTombstonedExisting()
-        {
-            return hasTombstonedExisting;
-        }
-
-        public void setTombstonedExisting()
-        {
-            hasTombstonedExisting = true;
-        }
-
-        public int size()
-        {
-            return clusteringToRow.size();
-        }
-    }
-}

Reply via email to