Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e624c663
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e624c663
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e624c663

Branch: refs/heads/cassandra-3.11
Commit: e624c6638254ea410691f085a10d08d412eb5ac1
Parents: 14d67d8 1b36740
Author: Paulo Motta <pa...@apache.org>
Authored: Tue Sep 5 01:04:34 2017 -0500
Committer: Paulo Motta <pa...@apache.org>
Committed: Tue Sep 5 01:05:06 2017 -0500

----------------------------------------------------------------------
 NEWS.txt                                        |   30 +-
 doc/cql3/CQL.textile                            |    6 +
 .../org/apache/cassandra/config/CFMetaData.java |   13 +
 .../apache/cassandra/cql3/UpdateParameters.java |    2 +-
 .../cql3/statements/AlterTableStatement.java    |   18 +-
 .../org/apache/cassandra/db/LivenessInfo.java   |   17 +-
 .../org/apache/cassandra/db/ReadCommand.java    |    7 +-
 .../db/compaction/CompactionIterator.java       |    7 +-
 .../apache/cassandra/db/filter/RowFilter.java   |    4 +-
 .../cassandra/db/partitions/PurgeFunction.java  |   14 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |    6 +-
 src/java/org/apache/cassandra/db/rows/Row.java  |   13 +-
 .../cassandra/db/rows/UnfilteredSerializer.java |    5 +
 .../apache/cassandra/db/transform/Filter.java   |    8 +-
 .../db/transform/FilteredPartitions.java        |    4 +-
 .../cassandra/db/transform/FilteredRows.java    |    2 +-
 .../apache/cassandra/db/view/TableViews.java    |   18 +-
 src/java/org/apache/cassandra/db/view/View.java |   41 +-
 .../apache/cassandra/db/view/ViewManager.java   |    5 +
 .../cassandra/db/view/ViewUpdateGenerator.java  |  163 ++-
 .../apache/cassandra/service/DataResolver.java  |    4 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |    2 +-
 .../apache/cassandra/cql3/ViewComplexTest.java  | 1344 ++++++++++++++++++
 .../cassandra/cql3/ViewFilteringTest.java       | 1030 +++++++++-----
 .../org/apache/cassandra/cql3/ViewTest.java     |   25 +-
 25 files changed, 2293 insertions(+), 495 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 8e39667,7064c5d..0682ae9
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,10 -18,28 +18,34 @@@ using the provided 'sstableupgrade' too
  
  Upgrading
  ---------
 -   - Nothing specific to this release, but please see previous upgrading 
sections,
 -     especially if you are upgrading from 2.2.
 +    - Nothing specific to this version but please see previous upgrading 
sections,
 +      especially if you are upgrading from 2.2.
  
+ Materialized Views
+ -------------------
+     - Cassandra will no longer allow dropping columns on tables with 
Materialized Views.
+     - A change was made in the way the Materialized View timestamp is 
computed, which
+       may cause an old deletion to a base column which is view primary key 
(PK) column
+       to not be reflected in the view when repairing the base table 
post-upgrade. This
+       condition is only possible when a column deletion to an MV primary key 
(PK) column
+       not present in the base table PK (via UPDATE base SET view_pk_col = 
null or DELETE
+       view_pk_col FROM base) is missed before the upgrade and received by 
repair after the upgrade.
+       If such column deletions are done on a view PK column which is not a 
base PK, it's advisable
+       to run repair on the base table of all nodes prior to the upgrade. 
Alternatively it's possible
+       to fix potential inconsistencies by running repair on the views after 
upgrade or drop and
+       re-create the views. See CASSANDRA-11500 for more details.
+     - Removal of columns not selected in the Materialized View (via UPDATE 
base SET unselected_column
+       = null or DELETE unselected_column FROM base) may not be properly 
reflected in the view in some
+       situations so we advise against doing deletions on base columns not 
selected in views
+       until this is fixed on CASSANDRA-13826.
 -
 -3.0.14
++    - Creating Materialized View with filtering on non-primary-key base column
++      (added in CASSANDRA-10368) is disabled, because the liveness of view row
++      is depending on multiple filtered base non-key columns and base non-key
++      column used in view primary-key. This semantic cannot be supported 
without
++      storage format change, see CASSANDRA-13826. For append-only use case, 
you
++      may still use this feature with a startup flag: 
"-Dcassandra.mv.allow_filtering_nonkey_columns_unsafe=true"
++
 +3.11.0
  ======
  
  Upgrading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/doc/cql3/CQL.textile
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 3ac790b,befdd25..fdbcf7a
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@@ -187,72 -189,54 +187,60 @@@ public class AlterTableStatement extend
                  break;
  
              case DROP:
 -                assert columnName != null;
                  if (!cfm.isCQLTable())
                      throw new InvalidRequestException("Cannot drop columns 
from a non-CQL3 table");
 -                if (def == null)
 -                    throw new InvalidRequestException(String.format("Column 
%s was not found in table %s", columnName, columnFamily()));
  
 -                switch (def.kind)
 +                for (AlterTableStatementColumn colData : colNameList)
                  {
 -                    case PARTITION_KEY:
 -                    case CLUSTERING:
 -                        throw new 
InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", 
columnName));
 -                    case REGULAR:
 -                    case STATIC:
 -                        ColumnDefinition toDelete = null;
 -                        for (ColumnDefinition columnDef : 
cfm.partitionColumns())
 -                        {
 -                            if (columnDef.name.equals(columnName))
 -                            {
 -                                toDelete = columnDef;
 -                                break;
 -                            }
 -                        }
 -                        assert toDelete != null;
 -                        cfm.removeColumnDefinition(toDelete);
 -                        cfm.recordColumnDrop(toDelete, deleteTimestamp == 
null ? queryState.getTimestamp() : deleteTimestamp);
 -                        break;
 -                }
 +                    columnName = colData.getColumnName().getIdentifier(cfm);
 +                    def = cfm.getColumnDefinition(columnName);
  
 -                // If the dropped column is required by any secondary indexes
 -                // we reject the operation, as the indexes must be dropped 
first
 -                Indexes allIndexes = cfm.getIndexes();
 -                if (!allIndexes.isEmpty())
 -                {
 -                    ColumnFamilyStore store = Keyspace.openAndGetStore(cfm);
 -                    Set<IndexMetadata> dependentIndexes = 
store.indexManager.getDependentIndexes(def);
 -                    if (!dependentIndexes.isEmpty())
 -                        throw new 
InvalidRequestException(String.format("Cannot drop column %s because it has " +
 -                                                                        
"dependent secondary indexes (%s)",
 -                                                                        def,
 -                                                                        
dependentIndexes.stream()
 -                                                                              
          .map(i -> i.name)
 -                                                                              
          .collect(Collectors.joining(","))));
 -                }
 +                    if (def == null)
 +                        throw new 
InvalidRequestException(String.format("Column %s was not found in table %s", 
columnName, columnFamily()));
 +
 +                    switch (def.kind)
 +                    {
 +                         case PARTITION_KEY:
 +                         case CLUSTERING:
 +                              throw new 
InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", 
columnName));
 +                         case REGULAR:
 +                         case STATIC:
 +                              ColumnDefinition toDelete = null;
 +                              for (ColumnDefinition columnDef : 
cfm.partitionColumns())
 +                              {
 +                                   if (columnDef.name.equals(columnName))
 +                                   {
 +                                       toDelete = columnDef;
 +                                       break;
 +                                   }
 +                               }
 +                             assert toDelete != null;
 +                             cfm.removeColumnDefinition(toDelete);
 +                             cfm.recordColumnDrop(toDelete, deleteTimestamp  
== null ? queryState.getTimestamp() : deleteTimestamp);
 +                             break;
 +                    }
  
 -                if (!Iterables.isEmpty(views))
 -                    throw new InvalidRequestException(String.format("Cannot 
drop column %s on base table with materialized views.",
 -                                                                    
columnName.toString(),
 -                                                                    
keyspace()));
 +                    // If the dropped column is required by any secondary 
indexes
 +                    // we reject the operation, as the indexes must be 
dropped first
 +                    Indexes allIndexes = cfm.getIndexes();
 +                    if (!allIndexes.isEmpty())
 +                    {
 +                        ColumnFamilyStore store = 
Keyspace.openAndGetStore(cfm);
 +                        Set<IndexMetadata> dependentIndexes = 
store.indexManager.getDependentIndexes(def);
 +                        if (!dependentIndexes.isEmpty())
 +                            throw new 
InvalidRequestException(String.format("Cannot drop column %s because it has " +
 +                                                                            
"dependent secondary indexes (%s)",
 +                                                                            
def,
 +                                                                            
dependentIndexes.stream()
 +                                                                              
              .map(i -> i.name)
 +                                                                              
              .collect(Collectors.joining(","))));
 +                    }
 +
-                     // If a column is dropped which is included in a view, we 
don't allow the drop to take place.
-                     boolean rejectAlter = false;
-                     StringBuilder builder = new StringBuilder();
-                     for (ViewDefinition view : views)
-                     {
-                         if (!view.includes(columnName)) continue;
-                         if (rejectAlter)
-                             builder.append(',');
-                         rejectAlter = true;
-                         builder.append(view.viewName);
-                     }
-                     if (rejectAlter)
-                         throw new 
InvalidRequestException(String.format("Cannot drop column %s, depended on by 
materialized views (%s.{%s})",
++                    if (!Iterables.isEmpty(views))
++                        throw new 
InvalidRequestException(String.format("Cannot drop column %s on base table with 
materialized views.",
 +                                                                        
columnName.toString(),
-                                                                         
keyspace(),
-                                                                         
builder.toString()));
++                                                                        
keyspace()));
 +                }
                  break;
              case OPTS:
                  if (attrs == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/Row.java
index 60ee4d4,3bcc220..9a8508b
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@@ -206,19 -213,9 +216,19 @@@ public interface Row extends Unfiltered
       * @return this row but without any deletion info purged by {@code 
purger}. If the purged row is empty, returns
       * {@code null}.
       */
-     public Row purge(DeletionPurger purger, int nowInSec);
+     public Row purge(DeletionPurger purger, int nowInSec, boolean 
enforceStrictLiveness);
  
      /**
 +     * Returns a copy of this row which only include the data queried by 
{@code filter}, excluding anything _fetched_ for
 +     * internal reasons but not queried by the user (see {@link ColumnFilter} 
for details).
 +     *
 +     * @param filter the {@code ColumnFilter} to use when deciding what is 
user queried. This should be the filter
 +     * that was used when querying the row on which this method is called.
 +     * @return the row but with all data that wasn't queried by the user 
skipped.
 +     */
 +    public Row withOnlyQueriedData(ColumnFilter filter);
 +
 +    /**
       * Returns a copy of this row where all counter cells have they "local" 
shard marked for clearing.
       */
      public Row markCounterLocalToBeCleared();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/ViewManager.java
index 5c72bb2,d1cfd9e..cb1e02b
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@@ -172,9 -159,13 +172,14 @@@ public class ViewManage
  
          forTable(view.getDefinition().baseTableMetadata()).removeByName(name);
          SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
 +        SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), 
view.name);
      }
  
+     public View getByName(String name)
+     {
+         return viewsByName.get(name);
+     }
+ 
      public void buildAllViews()
      {
          for (View view : allViews())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index edbda1c,0c8e078..74758e7
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@@ -376,14 -384,34 +383,34 @@@ public class ViewUpdateGenerato
          if (!matchesViewFilter(existingBaseRow))
              return;
  
-         deleteOldEntryInternal(existingBaseRow);
+         deleteOldEntryInternal(existingBaseRow, mergedBaseRow);
      }
  
-     private void deleteOldEntryInternal(Row existingBaseRow)
+     private void deleteOldEntryInternal(Row existingBaseRow, Row 
mergedBaseRow)
      {
          startNewUpdate(existingBaseRow);
-         DeletionTime dt = new 
DeletionTime(computeTimestampForEntryDeletion(existingBaseRow), nowInSec);
-         currentViewEntryBuilder.addRowDeletion(Row.Deletion.shadowable(dt));
+         long timestamp = computeTimestampForEntryDeletion(existingBaseRow, 
mergedBaseRow);
+         long rowDeletion = 
mergedBaseRow.deletion().time().markedForDeleteAt();
+         assert timestamp >= rowDeletion;
+ 
+         // If computed deletion timestamp greater than row deletion, it must 
be coming from
+         //  1. non-pk base column used in view pk, or
+         //  2. unselected base column
+         //  any case, we need to use it as expired livenessInfo
+         // If computed deletion timestamp is from row deletion, we only need 
row deletion itself
+         if (timestamp > rowDeletion)
+         {
+             /**
+               * TODO: This is a hack and overload of LivenessInfo and we 
should probably modify
+               * the storage engine to properly support this, but on the 
meantime this
+               * should be fine because it only happens in some specific 
scenarios explained above.
+               */
 -            LivenessInfo info = LivenessInfo.create(timestamp, 
Integer.MAX_VALUE, nowInSec);
++            LivenessInfo info = LivenessInfo.withExpirationTime(timestamp, 
Integer.MAX_VALUE, nowInSec);
+             currentViewEntryBuilder.addPrimaryKeyLivenessInfo(info);
+         }
+         currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());
+ 
+         addDifferentCells(existingBaseRow, mergedBaseRow);
          submitUpdate();
      }
  
@@@ -440,50 -458,74 +457,74 @@@
  
          LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo();
  
-         if (view.baseNonPKColumnsInViewPK.isEmpty())
+         if (view.hasSamePrimaryKeyColumnsAsBaseTable())
          {
-             int ttl = baseLiveness.ttl();
-             int expirationTime = baseLiveness.localExpirationTime();
+             if (view.getDefinition().includeAllColumns)
+                 return baseLiveness;
+ 
+             long timestamp = baseLiveness.timestamp();
+             boolean hasNonExpiringLiveCell = false;
+             Cell biggestExpirationCell = null;
              for (Cell cell : baseRow.cells())
              {
-                 if (cell.ttl() > ttl)
+                 if (view.getViewColumn(cell.column()) != null)
+                     continue;
+                 if (!isLive(cell))
+                     continue;
+                 timestamp = Math.max(timestamp, cell.maxTimestamp());
+                 if (!cell.isExpiring())
+                     hasNonExpiringLiveCell = true;
+                 else
                  {
-                     ttl = cell.ttl();
-                     expirationTime = cell.localDeletionTime();
+                     if (biggestExpirationCell == null)
+                         biggestExpirationCell = cell;
+                     else if (cell.localDeletionTime() > 
biggestExpirationCell.localDeletionTime())
+                         biggestExpirationCell = cell;
                  }
              }
-             return ttl == baseLiveness.ttl()
-                  ? baseLiveness
-                  : LivenessInfo.withExpirationTime(baseLiveness.timestamp(), 
ttl, expirationTime);
+             if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring())
 -                return LivenessInfo.create(viewMetadata, timestamp, nowInSec);
++                return LivenessInfo.create(timestamp, nowInSec);
+             if (hasNonExpiringLiveCell)
 -                return LivenessInfo.create(viewMetadata, timestamp, nowInSec);
++                return LivenessInfo.create(timestamp, nowInSec);
+             if (biggestExpirationCell == null)
+                 return baseLiveness;
+             if (biggestExpirationCell.localDeletionTime() > 
baseLiveness.localExpirationTime()
+                     || !baseLiveness.isLive(nowInSec))
 -                return LivenessInfo.create(timestamp,
++                return LivenessInfo.withExpirationTime(timestamp,
+                                                        
biggestExpirationCell.ttl(),
+                                                        
biggestExpirationCell.localDeletionTime());
+             return baseLiveness;
          }
  
-         ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
-         Cell cell = baseRow.getCell(baseColumn);
+         Cell cell = baseRow.getCell(view.baseNonPKColumnsInViewPK.get(0));
          assert isLive(cell) : "We shouldn't have got there if the base row 
had no associated entry";
  
-         long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp());
-         return LivenessInfo.withExpirationTime(timestamp, cell.ttl(), 
cell.localDeletionTime());
 -        return LivenessInfo.create(cell.timestamp(), cell.ttl(), 
cell.localDeletionTime());
++        return LivenessInfo.withExpirationTime(cell.timestamp(), cell.ttl(), 
cell.localDeletionTime());
      }
  
-     private long computeTimestampForEntryDeletion(Row baseRow)
+     private long computeTimestampForEntryDeletion(Row existingBaseRow, Row 
mergedBaseRow)
      {
-         // We delete the old row with it's row entry timestamp using a 
shadowable deletion.
-         // We must make sure that the deletion deletes everything in the 
entry (or the entry will
-         // still show up), so we must use the bigger timestamp found in the 
existing row (for any
-         // column included in the view at least).
-         // TODO: We have a problem though: if the entry is "resurected" by a 
later update, we would
-         // need to ensure that the timestamp for then entry then is bigger 
than the tombstone
-         // we're just inserting, which is not currently guaranteed.
-         // This is a bug for a separate ticket though.
-         long timestamp = baseRow.primaryKeyLivenessInfo().timestamp();
-         for (ColumnData data : baseRow)
+         DeletionTime deletion = mergedBaseRow.deletion().time();
+         if (view.hasSamePrimaryKeyColumnsAsBaseTable())
          {
-             if (!view.getDefinition().includes(data.column().name))
-                 continue;
+             long timestamp = Math.max(deletion.markedForDeleteAt(), 
existingBaseRow.primaryKeyLivenessInfo().timestamp());
+             if (view.getDefinition().includeAllColumns)
+                 return timestamp;
  
-             timestamp = Math.max(timestamp, data.maxTimestamp());
+             for (Cell cell : existingBaseRow.cells())
+             {
+                 // selected column should not contribute to view deletion, 
itself is already included in view row
+                 if (view.getViewColumn(cell.column()) != null)
+                     continue;
+                 // unselected column is used regardless live or dead, because 
we don't know if it was used for liveness.
+                 timestamp = Math.max(timestamp, cell.maxTimestamp());
+             }
+             return timestamp;
          }
-         return timestamp;
+         // has base non-pk column in view pk
+         Cell before = 
existingBaseRow.getCell(view.baseNonPKColumnsInViewPK.get(0));
+         assert isLive(before) : "We shouldn't have got there if the base row 
had no associated entry";
+         return deletion.deletes(before) ? deletion.markedForDeleteAt() : 
before.timestamp();
      }
  
      private void addColumnData(ColumnDefinition viewColumn, ColumnData 
baseTableData)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
index 0000000,9e32620..ece3e6d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
@@@ -1,0 -1,1343 +1,1344 @@@
+ package org.apache.cassandra.cql3;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.TimeUnit;
+ import java.util.stream.Collectors;
+ 
+ import org.apache.cassandra.concurrent.SEPExecutor;
+ import org.apache.cassandra.concurrent.Stage;
+ import org.apache.cassandra.concurrent.StageManager;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
+ import com.google.common.base.Objects;
+ 
+ public class ViewComplexTest extends CQLTester
+ {
 -    int protocolVersion = 4;
++    ProtocolVersion protocolVersion = ProtocolVersion.V4;
+     private final List<String> views = new ArrayList<>();
+ 
+     @BeforeClass
+     public static void startup()
+     {
+         requireNetwork();
+     }
+     @Before
+     public void begin()
+     {
+         views.clear();
+     }
+ 
+     @After
+     public void end() throws Throwable
+     {
+         for (String viewName : views)
+             executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + viewName);
+     }
+ 
+     private void createView(String name, String query) throws Throwable
+     {
+         executeNet(protocolVersion, String.format(query, name));
+         // If exception is thrown, the view will not be added to the list; 
since it shouldn't have been created, this is
+         // the desired behavior
+         views.add(name);
+     }
+ 
+     private void updateView(String query, Object... params) throws Throwable
+     {
+         updateViewWithFlush(query, false, params);
+     }
+ 
+     private void updateViewWithFlush(String query, boolean flush, Object... 
params) throws Throwable
+     {
+         executeNet(protocolVersion, query, params);
+         while (!(((SEPExecutor) 
StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
+                 && ((SEPExecutor) 
StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
+         {
+             Thread.sleep(1);
+         }
+         if (flush)
+             Keyspace.open(keyspace()).flush();
+     }
+ 
+     // for now, unselected column cannot be fully supported, SEE 
CASSANDRA-13826
+     @Ignore
+     @Test
+     public void testPartialDeleteUnselectedColumn() throws Throwable
+     {
+         boolean flush = true;
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY 
(k, c))");
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT k,c FROM %%s WHERE 
k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)");
+         Keyspace ks = Keyspace.open(keyspace());
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         updateView("UPDATE %s USING TIMESTAMP 10 SET b=1 WHERE k=1 AND c=1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, 1));
+         assertRows(execute("SELECT * from mv"), row(1, 1));
+         updateView("DELETE b FROM %s USING TIMESTAMP 11 WHERE k=1 AND c=1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertEmpty(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+         updateView("UPDATE %s USING TIMESTAMP 1 SET a=1 WHERE k=1 AND c=1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRows(execute("SELECT * from %s"), row(1, 1, 1, null));
+         assertRows(execute("SELECT * from mv"), row(1, 1));
+ 
+         execute("truncate %s;");
+ 
+         // removal generated by unselected column should not shadow PK update 
with smaller timestamp
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET a=1 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, 1, null));
+         assertRows(execute("SELECT * from mv"), row(1, 1));
+ 
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 20 SET a=null WHERE 
k=1 AND c=1", flush);
+         assertRows(execute("SELECT * from %s"));
+         assertRows(execute("SELECT * from mv"));
+ 
+         updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 
15", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, null));
+         assertRows(execute("SELECT * from mv"), row(1, 1));
+     }
+ 
+     @Test
+     public void testPartialDeleteSelectedColumnWithFlush() throws Throwable
+     {
+         testPartialDeleteSelectedColumn(true);
+     }
+ 
+     @Test
+     public void testPartialDeleteSelectedColumnWithoutFlush() throws Throwable
+     {
+         testPartialDeleteSelectedColumn(false);
+     }
+ 
+     private void testPartialDeleteSelectedColumn(boolean flush) throws 
Throwable
+     {
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         createTable("CREATE TABLE %s (k int, c int, a int, b int, e int, f 
int, PRIMARY KEY (k, c))");
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM %%s WHERE 
k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)");
+         Keyspace ks = Keyspace.open(keyspace());
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 10 SET b=1 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, 1));
+ 
+         updateViewWithFlush("DELETE b FROM %s USING TIMESTAMP 11 WHERE k=1 
AND c=1", flush);
+         assertEmpty(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 1 SET a=1 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, 1, null, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, 1, null));
+ 
+         updateViewWithFlush("DELETE a FROM %s USING TIMESTAMP 1 WHERE k=1 AND 
c=1", flush);
+         assertEmpty(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         // view livenessInfo should not be affected by selected column ts or 
tb
+         updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 
0", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 12 SET b=1 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, 1));
+ 
+         updateViewWithFlush("DELETE b FROM %s USING TIMESTAMP 13 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateViewWithFlush("DELETE FROM %s USING TIMESTAMP 14 WHERE k=1 AND 
c=1", flush);
+         assertEmpty(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 
15", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateViewWithFlush("UPDATE %s USING TTL 3 SET b=1 WHERE k=1 AND 
c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, 1));
+ 
+         TimeUnit.SECONDS.sleep(4);
+ 
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateViewWithFlush("DELETE FROM %s USING TIMESTAMP 15 WHERE k=1 AND 
c=1", flush);
+         assertEmpty(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         execute("truncate %s;");
+ 
+         // removal generated by unselected column should not shadow selected 
column with smaller timestamp
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET e=1 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, null, null, 1, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET e=null WHERE 
k=1 AND c=1", flush);
+         assertRows(execute("SELECT * from %s"));
+         assertRows(execute("SELECT * from mv"));
+ 
+         updateViewWithFlush("UPDATE %s USING TIMESTAMP 16 SET a=1 WHERE k=1 
AND c=1", flush);
+         assertRows(execute("SELECT * from %s"), row(1, 1, 1, null, null, 
null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, 1, null));
+     }
+ 
+     @Test
+     public void testUpdateColumnInViewPKWithTTLWithFlush() throws Throwable
+     {
+         // CASSANDRA-13657
+         testUpdateColumnInViewPKWithTTL(true);
+     }
+ 
+     @Test
+     public void testUpdateColumnInViewPKWithTTLWithoutFlush() throws Throwable
+     {
+         // CASSANDRA-13657
+         testUpdateColumnInViewPKWithTTL(false);
+     }
+ 
+     private void testUpdateColumnInViewPKWithTTL(boolean flush) throws 
Throwable
+     {
+         // CASSANDRA-13657 if base column used in view pk is ttled, then view 
row is considered dead
+         createTable("create table %s (k int primary key, a int, b int)");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k 
IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         updateView("UPDATE %s SET a = 1 WHERE k = 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT * from %s"), row(1, 1, null));
+         assertRows(execute("SELECT * from mv"), row(1, 1, null));
+ 
+         updateView("DELETE a FROM %s WHERE k = 1");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         updateView("INSERT INTO %s (k) VALUES (1);");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT * from %s"), row(1, null, null));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         updateView("UPDATE %s USING TTL 5 SET a = 10 WHERE k = 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT * from %s"), row(1, 10, null));
+         assertRows(execute("SELECT * from mv"), row(10, 1, null));
+ 
+         updateView("UPDATE %s SET b = 100 WHERE k = 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT * from %s"), row(1, 10, 100));
+         assertRows(execute("SELECT * from mv"), row(10, 1, 100));
+ 
+         Thread.sleep(5000);
+ 
+         // 'a' is TTL of 5 and removed.
+         assertRows(execute("SELECT * from %s"), row(1, null, 100));
+         assertEmpty(execute("SELECT * from mv"));
+         assertEmpty(execute("SELECT * from mv WHERE k = ? AND a = ?", 1, 10));
+ 
+         updateView("DELETE b FROM %s WHERE k=1");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT * from %s"), row(1, null, null));
+         assertEmpty(execute("SELECT * from mv"));
+ 
+         updateView("DELETE FROM %s WHERE k=1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertEmpty(execute("SELECT * from %s"));
+         assertEmpty(execute("SELECT * from mv"));
+     }
+ 
+     @Test
+     public void testUpdateColumnNotInViewWithFlush() throws Throwable
+     {
+         testUpdateColumnNotInView(true);
+     }
+ 
+     @Test
+     public void testUpdateColumnNotInViewWithoutFlush() throws Throwable
+     {
+         // CASSANDRA-13127
+         testUpdateColumnNotInView(false);
+     }
+ 
+     private void testUpdateColumnNotInView(boolean flush) throws Throwable
+     {
+         // CASSANDRA-13127: if base column not selected in view are alive, 
then pk of view row should be alive
+         createTable("create table %s (p int, c int, v1 int, v2 int, primary 
key(p, c))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT p, c FROM %%s WHERE 
p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         updateView("UPDATE %s USING TIMESTAMP 0 SET v1 = 1 WHERE p = 0 AND c 
= 0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = 
?", 0, 0), row(0, 0, 1, null));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         updateView("DELETE v1 FROM %s USING TIMESTAMP 1 WHERE p = 0 AND c = 
0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
+         assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
+ 
+         // shadowed by tombstone
+         updateView("UPDATE %s USING TIMESTAMP 1 SET v1 = 1 WHERE p = 0 AND c 
= 0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
+         assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
+ 
+         updateView("UPDATE %s USING TIMESTAMP 2 SET v2 = 1 WHERE p = 0 AND c 
= 0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = 
?", 0, 0), row(0, 0, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         updateView("DELETE v1 FROM %s USING TIMESTAMP 3 WHERE p = 0 AND c = 
0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = 
?", 0, 0), row(0, 0, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         updateView("DELETE v2 FROM %s USING TIMESTAMP 4 WHERE p = 0 AND c = 
0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
+         assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0));
+ 
+         updateView("UPDATE %s USING TTL 3 SET v2 = 1 WHERE p = 0 AND c = 0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = 
?", 0, 0), row(0, 0, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = 
?", 0, 0));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0));
+ 
+         updateView("UPDATE %s SET v2 = 1 WHERE p = 0 AND c = 0");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = 
?", 0, 0), row(0, 0, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         assertInvalidMessage("Cannot drop column v2 on base table with 
materialized views", "ALTER TABLE %s DROP v2");
+         // // drop unselected base column, unselected metadata should be 
removed, thus view row is dead
+         // updateView("ALTER TABLE %s DROP v2");
+         // assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND 
p = ?", 0, 0));
+         // assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND 
p = ?", 0, 0));
+         // assertRowsIgnoringOrder(execute("SELECT * from %s"));
+         // assertRowsIgnoringOrder(execute("SELECT * from mv"));
+     }
+ 
+     @Test
+     public void testPartialUpdateWithUnselectedCollectionsWithFlush() throws 
Throwable
+     {
+         testPartialUpdateWithUnselectedCollections(true);
+     }
+ 
+     @Test
+     public void testPartialUpdateWithUnselectedCollectionsWithoutFlush() 
throws Throwable
+     {
+         testPartialUpdateWithUnselectedCollections(false);
+     }
+ 
+     public void testPartialUpdateWithUnselectedCollections(boolean flush) 
throws Throwable
+     {
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         createTable("CREATE TABLE %s (k int, c int, a int, b int, l 
list<int>, s set<int>, m map<int,int>, PRIMARY KEY (k, c))");
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM %%s WHERE 
k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)");
+         Keyspace ks = Keyspace.open(keyspace());
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         updateView("UPDATE %s SET l=l+[1,2,3] WHERE k = 1 AND c = 1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateView("UPDATE %s SET l=l-[1,2] WHERE k = 1 AND c = 1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, null));
+ 
+         updateView("UPDATE %s SET b=3 WHERE k=1 AND c=1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRows(execute("SELECT * from mv"), row(1, 1, null, 3));
+ 
+         updateView("UPDATE %s SET b=null, l=l-[3], s=s-{3} WHERE k = 1 AND c 
= 1");
+         if (flush)
+         {
+             FBUtilities.waitOnFutures(ks.flush());
+             ks.getColumnFamilyStore("mv").forceMajorCompaction();
+         }
+         assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"));
+         assertRowsIgnoringOrder(execute("SELECT * from mv"));
+ 
+         updateView("UPDATE %s SET m=m+{3:3}, l=l-[1], s=s-{2} WHERE k = 1 AND 
c = 1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"), row(1, 1, 
null, null));
+         assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 1, null, 
null));
+ 
+         assertInvalidMessage("Cannot drop column m on base table with 
materialized views", "ALTER TABLE %s DROP m");
+         // executeNet(protocolVersion, "ALTER TABLE %s DROP m");
+         // ks.getColumnFamilyStore("mv").forceMajorCompaction();
+         // assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s WHERE k = 
1 AND c = 1"));
+         // assertRowsIgnoringOrder(execute("SELECT * from mv WHERE k = 1 AND 
c = 1"));
+         // assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"));
+         // assertRowsIgnoringOrder(execute("SELECT * from mv"));
+     }
+ 
+     @Test
+     public void testUnselectedColumnsTTLWithFlush() throws Throwable
+     {
+         // CASSANDRA-13127
+         testUnselectedColumnsTTL(true);
+     }
+ 
+     @Test
+     public void testUnselectedColumnsTTLWithoutFlush() throws Throwable
+     {
+         // CASSANDRA-13127
+         testUnselectedColumnsTTL(false);
+     }
+ 
+     private void testUnselectedColumnsTTL(boolean flush) throws Throwable
+     {
+         // CASSANDRA-13127 not ttled unselected column in base should keep 
view row alive
+         createTable("create table %s (p int, c int, v int, primary key(p, 
c))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT p, c FROM %%s WHERE 
p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         updateViewWithFlush("INSERT INTO %s (p, c) VALUES (0, 0) USING TTL 
3;", flush);
+ 
+         updateViewWithFlush("UPDATE %s USING TTL 1000 SET v = 0 WHERE p = 0 
and c = 0;", flush);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         Thread.sleep(3000);
+ 
+         UntypedResultSet.Row row = execute("SELECT v, ttl(v) from %s WHERE c 
= ? AND p = ?", 0, 0).one();
+         assertTrue("row should have value of 0", row.getInt("v") == 0);
+         assertTrue("row should have ttl less than 1000", row.getInt("ttl(v)") 
< 1000);
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         updateViewWithFlush("DELETE FROM %s WHERE p = 0 and c = 0;", flush);
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0));
+ 
+         updateViewWithFlush("INSERT INTO %s (p, c) VALUES (0, 0) ", flush);
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         // already have a live row, no need to apply the unselected cell ttl
+         updateViewWithFlush("UPDATE %s USING TTL 3 SET v = 0 WHERE p = 0 and 
c = 0;", flush);
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+ 
+         updateViewWithFlush("INSERT INTO %s (p, c) VALUES (1, 1) USING TTL 
3", flush);
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 1, 1), row(1, 1));
+ 
+         Thread.sleep(4000);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 0, 0), row(0, 0));
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 1, 1));
+ 
+         // unselected should keep view row alive
+         updateViewWithFlush("UPDATE %s SET v = 0 WHERE p = 1 and c = 1;", 
flush);
+         assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = 
?", 1, 1), row(1, 1));
+ 
+     }
+ 
+     @Test
+     public void testRangeDeletionWithFlush() throws Throwable
+     {
+         testRangeDeletion(true);
+     }
+ 
+     @Test
+     public void testRangeDeletionWithoutFlush() throws Throwable
+     {
+         testRangeDeletion(false);
+     }
+ 
+     public void testRangeDeletion(boolean flush) throws Throwable
+     {
+         // for partition range deletion, need to know that existing row is 
shadowed instead of not existed.
+         createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY 
(a))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+ 
+         createView("mv_test1",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a 
IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a, b)");
+ 
+         Keyspace ks = Keyspace.open(keyspace());
+         ks.getColumnFamilyStore("mv_test1").disableAutoCompaction();
+ 
+         execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) using 
timestamp 0", 1, 1, 1, 1);
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"), row(1, 1, 
1, 1));
+ 
+         // remove view row
+         updateView("UPDATE %s using timestamp 1 set b = null WHERE a=1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"));
+         // remove base row, no view updated generated.
+         updateView("DELETE FROM %s using timestamp 2 where a=1");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"));
+ 
+         // restor view row with b,c column. d is still tombstone
+         updateView("UPDATE %s using timestamp 3 set b = 1,c = 1 where a=1"); 
// upsert
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"), row(1, 1, 
1, null));
+     }
+ 
+     @Test
+     public void testBaseTTLWithSameTimestampTest() throws Throwable
+     {
+         // CASSANDRA-13127 when liveness timestamp tie, greater 
localDeletionTime should win if both are expiring.
+         createTable("create table %s (p int, c int, v int, primary key(p, 
c))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) using timestamp 
1;");
+ 
+         FBUtilities.waitOnFutures(ks.flush());
+ 
+         updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING TTL 3 and 
timestamp 1;");
+ 
+         FBUtilities.waitOnFutures(ks.flush());
+ 
+         Thread.sleep(4000);
+ 
+         assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
+ 
+         // reversed order
+         execute("truncate %s;");
+ 
+         updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING TTL 3 and 
timestamp 1;");
+ 
+         FBUtilities.waitOnFutures(ks.flush());
+ 
+         updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING timestamp 
1;");
+ 
+         FBUtilities.waitOnFutures(ks.flush());
+ 
+         Thread.sleep(4000);
+ 
+         assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0));
+ 
+     }
+ 
+     @Test
+     public void testCommutativeRowDeletionFlush() throws Throwable
+     {
+         // CASSANDRA-13409
+         testCommutativeRowDeletion(true);
+     }
+ 
+     @Test
+     public void testCommutativeRowDeletionWithoutFlush() throws Throwable
+     {
+         // CASSANDRA-13409
+         testCommutativeRowDeletion(false);
+     }
+ 
+     private void testCommutativeRowDeletion(boolean flush) throws Throwable
+     {
+         // CASSANDRA-13409 new update should not resurrect previous deleted 
data in view
+         createTable("create table %s (p int primary key, v1 int, v2 int)");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select * from %%s where p 
is not null and v1 is not null primary key (v1, p);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         // sstable-1, Set initial values TS=1
+         updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using 
timestamp 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv 
WHERE v1 = ? AND p = ?", 1, 3), row(3, 1L));
+         // sstable-2
+         updateView("Delete from %s using timestamp 2 where p = 3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"));
+         // sstable-3
+         updateView("Insert into %s (p, v1) values (3, 1) using timestamp 3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, null, null));
+         // sstable-4
+         updateView("UPdate %s using timestamp 4 set v1 = 2 where p = 3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(2, 3, null, null));
+         // sstable-5
+         updateView("UPdate %s using timestamp 5 set v1 = 1 where p = 3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, null, null));
+ 
+         if (flush)
+         {
+             // compact sstable 2 and 4, 5;
+             ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
+             List<String> sstables = cfs.getLiveSSTables()
+                                        .stream()
+                                        .sorted((s1, s2) -> 
s1.descriptor.generation - s2.descriptor.generation)
+                                        .map(s -> s.getFilename())
+                                        .collect(Collectors.toList());
+             String dataFiles = String.join(",", 
Arrays.asList(sstables.get(1), sstables.get(3), sstables.get(4)));
+             CompactionManager.instance.forceUserDefinedCompaction(dataFiles);
+             assertEquals(3, cfs.getLiveSSTables().size());
+         }
+         // regular tombstone should be retained after compaction
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, null, null));
+     }
+ 
+     @Test
+     public void testUnselectedColumnWithExpiredLivenessInfo() throws Throwable
+     {
+         boolean flush = true;
+         createTable("create table %s (k int, c int, a int, b int, PRIMARY 
KEY(k, c))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select k,c,b from %%s 
where c is not null and k is not null primary key (c, k);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         // sstable-1, Set initial values TS=1
+         updateViewWithFlush("UPDATE %s SET a = 1 WHERE k = 1 AND c = 1;", 
flush);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 
1;"),
+                                 row(1, 1, 1, null));
+         assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND 
c = 1;"),
+                                 row(1, 1, null));
+ 
+         // sstable-2
+         updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TTL 5", 
flush);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 
1;"),
+                                 row(1, 1, 1, null));
+         assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND 
c = 1;"),
+                                 row(1, 1, null));
+ 
+         Thread.sleep(5001);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 
1;"),
+                                 row(1, 1, 1, null));
+         assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND 
c = 1;"),
+                                 row(1, 1, null));
+ 
+         // sstable-3
+         updateViewWithFlush("Update %s set a = null where k = 1 AND c = 1;", 
flush);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 
1;"));
+         assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND 
c = 1;"));
+ 
+         // sstable-4
+         updateViewWithFlush("Update %s USING TIMESTAMP 1 set b = 1 where k = 
1 AND c = 1;", flush);
+ 
+         assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 
1;"),
+                                 row(1, 1, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND 
c = 1;"),
+                                 row(1, 1, 1));
+     }
+ 
+     @Test
+     public void testUpdateWithColumnTimestampSmallerThanPkWithFlush() throws 
Throwable
+     {
+         testUpdateWithColumnTimestampSmallerThanPk(true);
+     }
+ 
+     @Test
+     public void testUpdateWithColumnTimestampSmallerThanPkWithoutFlush() 
throws Throwable
+     {
+         testUpdateWithColumnTimestampSmallerThanPk(false);
+     }
+ 
+     public void testUpdateWithColumnTimestampSmallerThanPk(boolean flush) 
throws Throwable
+     {
+         createTable("create table %s (p int primary key, v1 int, v2 int)");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select * from %%s where p 
is not null and v1 is not null primary key (v1, p);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         // reset value
+         updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using 
timestamp 6;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, 3, 6L));
+         // increase pk's timestamp to 20
+         updateView("Insert into %s (p) values (3) using timestamp 20;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, 3, 6L));
+         // change v1's to 2 and remove existing view row with ts7
+         updateView("UPdate %s using timestamp 7 set v1 = 2 where p = 3;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(2, 3, 3, 6L));
+         // change v1's to 1 and remove existing view row with ts8
+         updateView("UPdate %s using timestamp 8 set v1 = 1 where p = 3;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, 3, 6L));
+     }
+ 
+     @Test
+     public void testUpdateWithColumnTimestampBiggerThanPkWithFlush() throws 
Throwable
+     {
+         // CASSANDRA-11500
+         testUpdateWithColumnTimestampBiggerThanPk(true);
+     }
+ 
+     @Test
+     public void testUpdateWithColumnTimestampBiggerThanPkWithoutFlush() 
throws Throwable
+     {
+         // CASSANDRA-11500
+         testUpdateWithColumnTimestampBiggerThanPk(false);
+     }
+ 
+     public void testUpdateWithColumnTimestampBiggerThanPk(boolean flush) 
throws Throwable
+     {
+         // CASSANDRA-11500 able to shadow old view row with column ts greater 
tahn pk's ts and re-insert the view row
+         createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int);");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k 
IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+         updateView("DELETE FROM %s USING TIMESTAMP 0 WHERE k = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         // sstable-1, Set initial values TS=1
+         updateView("INSERT INTO %s(k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 
1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 
1));
+         updateView("UPDATE %s USING TIMESTAMP 10 SET b = 2 WHERE k = 1;");
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 
2));
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 
2));
+         updateView("UPDATE %s USING TIMESTAMP 2 SET a = 2 WHERE k = 1;");
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 
2));
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         ks.getColumnFamilyStore("mv").forceMajorCompaction();
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 
2));
+         updateView("UPDATE %s USING TIMESTAMP 11 SET a = 1 WHERE k = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 
2));
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, 1, 
2));
+ 
+         // set non-key base column as tombstone, view row is removed with 
shadowable
+         updateView("UPDATE %s USING TIMESTAMP 12 SET a = null WHERE k = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"));
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, null, 
2));
+ 
+         // column b should be alive
+         updateView("UPDATE %s USING TIMESTAMP 13 SET a = 1 WHERE k = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 
2));
+         assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, 1, 
2));
+ 
+         assertInvalidMessage("Cannot drop column a on base table with 
materialized views", "ALTER TABLE %s DROP a");
+     }
+ 
+     @Test
+     public void testNonBaseColumnInViewPkWithFlush() throws Throwable
+     {
+         testNonBaseColumnInViewPk(true);
+     }
+ 
+     @Test
+     public void testNonBaseColumnInViewPkWithoutFlush() throws Throwable
+     {
+         testNonBaseColumnInViewPk(true);
+     }
+ 
+     public void testNonBaseColumnInViewPk(boolean flush) throws Throwable
+     {
+         createTable("create table %s (p1 int, p2 int, v1 int, v2 int, primary 
key (p1,p2))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select * from %%s where p1 
is not null and p2 is not null primary key (p2, p1)"
+                            + " with gc_grace_seconds=5;");
+         ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
+         cfs.disableAutoCompaction();
+ 
+         updateView("UPDATE %s USING TIMESTAMP 1 set v1 =1 where p1 = 1 AND p2 
= 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), 
row(1, 1, 1, null));
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), 
row(1, 1, 1, null));
+ 
+         updateView("UPDATE %s USING TIMESTAMP 2 set v1 = null, v2 = 1 where 
p1 = 1 AND p2 = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), 
row(1, 1, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), 
row(1, 1, null, 1));
+ 
+         updateView("UPDATE %s USING TIMESTAMP 2 set v2 = null where p1 = 1 
AND p2 = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"));
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"));
+ 
+         updateView("INSERT INTO %s (p1,p2) VALUES(1,1) USING TIMESTAMP 3;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), 
row(1, 1, null, null));
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), 
row(1, 1, null, null));
+ 
+         updateView("DELETE FROM %s USING TIMESTAMP 4 WHERE p1 =1 AND p2 = 
1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"));
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"));
+ 
+         updateView("UPDATE %s USING TIMESTAMP 5 set v2 = 1 where p1 = 1 AND 
p2 = 1;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), 
row(1, 1, null, 1));
+         assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), 
row(1, 1, null, 1));
+     }
+ 
+     @Test
+     public void testStrictLivenessTombstone() throws Throwable
+     {
+         createTable("create table %s (p int primary key, v1 int, v2 int)");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select * from %%s where p 
is not null and v1 is not null primary key (v1, p)"
+                            + " with gc_grace_seconds=5;");
+         ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
+         cfs.disableAutoCompaction();
+ 
+         updateView("Insert into %s (p, v1, v2) values (1, 1, 1) ;");
+         assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 
1, 1));
+ 
+         updateView("Update %s set v1 = null WHERE p = 1");
+         FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"));
+ 
+         cfs.forceMajorCompaction(); // before gc grace second, 
strict-liveness tombstoned dead row remains
+         assertEquals(1, cfs.getLiveSSTables().size());
+ 
+         Thread.sleep(6000);
+         assertEquals(1, cfs.getLiveSSTables().size()); // no auto compaction.
+ 
+         cfs.forceMajorCompaction(); // after gc grace second, no data left
+         assertEquals(0, cfs.getLiveSSTables().size());
+ 
+         updateView("Update %s using ttl 5 set v1 = 1 WHERE p = 1");
+         FBUtilities.waitOnFutures(ks.flush());
+         assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 
1, 1));
+ 
+         cfs.forceMajorCompaction(); // before ttl+gc_grace_second, 
strict-liveness ttled dead row remains
+         assertEquals(1, cfs.getLiveSSTables().size());
+         assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 
1, 1));
+ 
+         Thread.sleep(5500); // after expired, before gc_grace_second
+         cfs.forceMajorCompaction();// before ttl+gc_grace_second, 
strict-liveness ttled dead row remains
+         assertEquals(1, cfs.getLiveSSTables().size());
+         assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"));
+ 
+         Thread.sleep(5500); // after expired + gc_grace_second
+         assertEquals(1, cfs.getLiveSSTables().size()); // no auto compaction.
+ 
+         cfs.forceMajorCompaction(); // after gc grace second, no data left
+         assertEquals(0, cfs.getLiveSSTables().size());
+     }
+ 
+     @Test
+     public void testCellTombstoneAndShadowableTombstonesWithFlush() throws 
Throwable
+     {
+         testCellTombstoneAndShadowableTombstones(true);
+     }
+ 
+     @Test
+     public void testCellTombstoneAndShadowableTombstonesWithoutFlush() throws 
Throwable
+     {
+         testCellTombstoneAndShadowableTombstones(false);
+     }
+ 
+     private void testCellTombstoneAndShadowableTombstones(boolean flush) 
throws Throwable
+     {
+         createTable("create table %s (p int primary key, v1 int, v2 int)");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select * from %%s where p 
is not null and v1 is not null primary key (v1, p);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         // sstable 1, Set initial values TS=1
+         updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using 
timestamp 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv 
WHERE v1 = ? AND p = ?", 1, 3), row(3, 1L));
+         // sstable 2
+         updateView("UPdate %s using timestamp 2 set v2 = null where p = 3");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv 
WHERE v1 = ? AND p = ?", 1, 3),
+                                 row(null, null));
+         // sstable 3
+         updateView("UPdate %s using timestamp 3 set v1 = 2 where p = 3");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(2, 3, null, null));
+         // sstable 4
+         updateView("UPdate %s using timestamp 4 set v1 = 1 where p = 3");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, null, null));
+ 
+         if (flush)
+         {
+             // compact sstable 2 and 3;
+             ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv");
+             List<String> sstables = cfs.getLiveSSTables()
+                                        .stream()
+                                        .sorted(Comparator.comparingInt(s -> 
s.descriptor.generation))
+                                        .map(s -> s.getFilename())
+                                        .collect(Collectors.toList());
+             System.out.println("SSTables " + sstables);
+             String dataFiles = String.join(",", 
Arrays.asList(sstables.get(1), sstables.get(2)));
+             CompactionManager.instance.forceUserDefinedCompaction(dataFiles);
+         }
+         // cell-tombstone in sstable 4 is not compacted away, because the 
shadowable tombstone is shadowed by new row.
+         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, null, null));
+     }
+ 
+     @Test
+     public void complexTimestampDeletionTestWithFlush() throws Throwable
+     {
+         complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(true);
+         complexTimestampWithbasePKColumnsInViewPKDeletionTest(true);
+     }
+ 
+     @Test
+     public void complexTimestampDeletionTestWithoutFlush() throws Throwable
+     {
+         complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(false);
+         complexTimestampWithbasePKColumnsInViewPKDeletionTest(false);
+     }
+ 
+     private void 
complexTimestampWithbasePKColumnsInViewPKDeletionTest(boolean flush) throws 
Throwable
+     {
+         createTable("create table %s (p1 int, p2 int, v1 int, v2 int, primary 
key(p1, p2))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv2",
+                    "create materialized view %s as select * from %%s where p1 
is not null and p2 is not null primary key (p2, p1);");
+         ks.getColumnFamilyStore("mv2").disableAutoCompaction();
+ 
+         // Set initial values TS=1
+         updateView("Insert into %s (p1, p2, v1, v2) values (1, 2, 3, 4) using 
timestamp 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from 
mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
+                                 row(3, 4, 1L));
+         // remove row/mv TS=2
+         updateView("Delete from %s using timestamp 2 where p1 = 1 and p2 = 
2;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         // view are empty
+         assertRowsIgnoringOrder(execute("SELECT * from mv2"));
+         // insert PK with TS=3
+         updateView("Insert into %s (p1, p2) values (1, 2) using timestamp 
3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         // deleted column in MV remained dead
+         assertRowsIgnoringOrder(execute("SELECT * from mv2"), row(2, 1, null, 
null));
+ 
+         ks.getColumnFamilyStore("mv2").forceMajorCompaction();
+         assertRowsIgnoringOrder(execute("SELECT * from mv2"), row(2, 1, null, 
null));
+ 
+         // reset values
+         updateView("Insert into %s (p1, p2, v1, v2) values (1, 2, 3, 4) using 
timestamp 10;");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from 
mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
+                                 row(3, 4, 10L));
+ 
+         updateView("UPDATE %s using timestamp 20 SET v2 = 5 WHERE p1 = 1 and 
p2 = 2");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from 
mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
+                                 row(3, 5, 20L));
+ 
+         updateView("DELETE FROM %s using timestamp 10 WHERE p1 = 1 and p2 = 
2");
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from 
mv2 WHERE p1 = ? AND p2 = ?", 1, 2),
+                                 row(null, 5, 20L));
+     }
+ 
+     public void 
complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(boolean flush) throws 
Throwable
+     {
+         createTable("create table %s (p int primary key, v1 int, v2 int)");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         createView("mv",
+                    "create materialized view %s as select * from %%s where p 
is not null and v1 is not null primary key (v1, p);");
+         ks.getColumnFamilyStore("mv").disableAutoCompaction();
+ 
+         // Set initial values TS=1
+         updateView("Insert into %s (p, v1, v2) values (3, 1, 5) using 
timestamp 1;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv 
WHERE v1 = ? AND p = ?", 1, 3), row(5, 1L));
+         // remove row/mv TS=2
+         updateView("Delete from %s using timestamp 2 where p = 3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         // view are empty
+         assertRowsIgnoringOrder(execute("SELECT * from mv"));
+         // insert PK with TS=3
+         updateView("Insert into %s (p, v1) values (3, 1) using timestamp 3;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         // deleted column in MV remained dead
+         assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null));
+ 
+         // insert values TS=2, it should be considered dead due to previous 
tombstone
+         updateView("Insert into %s (p, v1, v2) values (3, 1, 5) using 
timestamp 2;");
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+         // deleted column in MV remained dead
+         assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null));
+ 
+         // insert values TS=2, it should be considered dead due to previous 
tombstone
+         executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET v2 = ? 
WHERE p = ?", 4, 3);
+ 
+         if (flush)
+             FBUtilities.waitOnFutures(ks.flush());
+ 
+         assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 
3, 4, 3L));
+ 
+         ks.getColumnFamilyStore("mv").forceMajorCompaction();
+         assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 
3, 4, 3L));
+     }
+ 
+     @Test
+     public void testMVWithDifferentColumnsWithFlush() throws Throwable
+     {
+         testMVWithDifferentColumns(true);
+     }
+ 
+     @Test
+     public void testMVWithDifferentColumnsWithoutFlush() throws Throwable
+     {
+         testMVWithDifferentColumns(false);
+     }
+ 
+     private void testMVWithDifferentColumns(boolean flush) throws Throwable
+     {
+         createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, f 
int, PRIMARY KEY(a, b))");
+ 
+         execute("USE " + keyspace());
+         executeNet(protocolVersion, "USE " + keyspace());
+         List<String> viewNames = new ArrayList<>();
+         List<String> mvStatements = Arrays.asList(
+                                                   // all selected
+                                                   "CREATE MATERIALIZED VIEW 
%s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY 
(a,b)",
+                                                   // unselected e,f
+                                                   "CREATE MATERIALIZED VIEW 
%s AS SELECT c,d FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY 
(a,b)",
+                                                   // no selected
+                                                   "CREATE MATERIALIZED VIEW 
%s AS SELECT a,b FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY 
(a,b)",
+                                                   // all selected, re-order 
keys
+                                                   "CREATE MATERIALIZED VIEW 
%s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY 
(b,a)",
+                                                   // unselected e,f, re-order 
keys
+                                                   "CREATE MATERIALIZED VIEW 
%s AS SELECT a,b,c,d FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY 
(b,a)",
+                                                   // no selected, re-order 
keys
+                                                   "CREATE MATERIALIZED VIEW 
%s AS SELECT a,b FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY 
(b,a)");
+ 
+         Keyspace ks = Keyspace.open(keyspace());
+ 
+         for (int i = 0; i < mvStatements.size(); i++)
+         {
+             String name = "mv" + i;
+             viewNames.add(name);
+             createView(name, mvStatements.get(i));
+             ks.getColumnFamilyStore(name).disableAutoCompaction();
+         }
+ 
+         // insert
+         updateViewWithFlush("INSERT INTO %s (a,b,c,d,e,f) VALUES(1,1,1,1,1,1) 
using timestamp 1", flush);
+         assertBaseViews(row(1, 1, 1, 1, 1, 1), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 2 SET c=0, d=0 WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(row(1, 1, 0, 0, 1, 1), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 2 SET e=0, f=0 WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(row(1, 1, 0, 0, 0, 0), viewNames);
+ 
+         updateViewWithFlush("DELETE FROM %s using timestamp 2 WHERE a=1 AND 
b=1", flush);
+         assertBaseViews(null, viewNames);
+ 
+         // partial update unselected, selected
+         updateViewWithFlush("UPDATE %s using timestamp 3 SET f=1 WHERE a=1 
AND b=1", flush);
+         assertBaseViews(row(1, 1, null, null, null, 1), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 4 SET e = 1, f=null 
WHERE a=1 AND b=1", flush);
+         assertBaseViews(row(1, 1, null, null, 1, null), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 4 SET e = null WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(null, viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 5 SET c = 1 WHERE a=1 
AND b=1", flush);
+         assertBaseViews(row(1, 1, 1, null, null, null), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 5 SET c = null WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(null, viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 6 SET d = 1 WHERE a=1 
AND b=1", flush);
+         assertBaseViews(row(1, 1, null, 1, null, null), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 7 SET d = null WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(null, viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 8 SET f = 1 WHERE a=1 
AND b=1", flush);
+         assertBaseViews(row(1, 1, null, null, null, 1), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 6 SET c = 1 WHERE a=1 
AND b=1", flush);
+         assertBaseViews(row(1, 1, 1, null, null, 1), viewNames);
+ 
+         // view row still alive due to c=1@6
+         updateViewWithFlush("UPDATE %s using timestamp 8 SET f = null WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(row(1, 1, 1, null, null, null), viewNames);
+ 
+         updateViewWithFlush("UPDATE %s using timestamp 6 SET c = null WHERE 
a=1 AND b=1", flush);
+         assertBaseViews(null, viewNames);
+     }
+ 
+     private void assertBaseViews(Object[] row, List<String> viewNames) throws 
Throwable
+     {
+         UntypedResultSet result = execute("SELECT * FROM %s");
+         if (row == null)
+             assertRowsIgnoringOrder(result);
+         else
+             assertRowsIgnoringOrder(result, row);
+         for (int i = 0; i < viewNames.size(); i++)
+             assertBaseView(result, execute(String.format("SELECT * FROM %s", 
viewNames.get(i))), viewNames.get(i));
+     }
+ 
+     private void assertBaseView(UntypedResultSet base, UntypedResultSet view, 
String mv)
+     {
+         List<ColumnSpecification> baseMeta = base.metadata();
+         List<ColumnSpecification> viewMeta = view.metadata();
+ 
+         Iterator<UntypedResultSet.Row> iter = base.iterator();
+         Iterator<UntypedResultSet.Row> viewIter = view.iterator();
+ 
+         List<UntypedResultSet.Row> baseData = 
com.google.common.collect.Lists.newArrayList(iter);
+         List<UntypedResultSet.Row> viewData = 
com.google.common.collect.Lists.newArrayList(viewIter);
+ 
+         if (baseData.size() != viewData.size())
+             fail(String.format("Mismatch number of rows in view %s: <%s>, in 
base <%s>",
+                                mv,
+                                makeRowStrings(view),
+                                makeRowStrings(base)));
+         if (baseData.size() == 0)
+             return;
+         if (viewData.size() != 1)
+             fail(String.format("Expect only one row in view %s, but got <%s>",
+                                mv,
+                                makeRowStrings(view)));
+ 
+         UntypedResultSet.Row row = baseData.get(0);
+         UntypedResultSet.Row viewRow = viewData.get(0);
+ 
+         Map<String, ByteBuffer> baseValues = new HashMap<>();
+         for (int j = 0; j < baseMeta.size(); j++)
+         {
+             ColumnSpecification column = baseMeta.get(j);
+             ByteBuffer actualValue = row.getBytes(column.name.toString());
+             baseValues.put(column.name.toString(), actualValue);
+         }
+         for (int j = 0; j < viewMeta.size(); j++)
+         {
+             ColumnSpecification column = viewMeta.get(j);
+             String name = column.name.toString();
+             ByteBuffer viewValue = viewRow.getBytes(name);
+             if (!baseValues.containsKey(name))
+             {
+                 fail(String.format("Extra column: %s with value %s in view", 
name, column.type.compose(viewValue)));
+             }
+             else if (!Objects.equal(baseValues.get(name), viewValue))
+             {
+                 fail(String.format("Non equal column: %s, expected <%s> but 
got <%s>",
+                                    name,
+                                    column.type.compose(baseValues.get(name)),
+                                    column.type.compose(viewValue)));
+             }
+         }
+     }
+ 
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to