Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e624c663 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e624c663 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e624c663 Branch: refs/heads/cassandra-3.11 Commit: e624c6638254ea410691f085a10d08d412eb5ac1 Parents: 14d67d8 1b36740 Author: Paulo Motta <pa...@apache.org> Authored: Tue Sep 5 01:04:34 2017 -0500 Committer: Paulo Motta <pa...@apache.org> Committed: Tue Sep 5 01:05:06 2017 -0500 ---------------------------------------------------------------------- NEWS.txt | 30 +- doc/cql3/CQL.textile | 6 + .../org/apache/cassandra/config/CFMetaData.java | 13 + .../apache/cassandra/cql3/UpdateParameters.java | 2 +- .../cql3/statements/AlterTableStatement.java | 18 +- .../org/apache/cassandra/db/LivenessInfo.java | 17 +- .../org/apache/cassandra/db/ReadCommand.java | 7 +- .../db/compaction/CompactionIterator.java | 7 +- .../apache/cassandra/db/filter/RowFilter.java | 4 +- .../cassandra/db/partitions/PurgeFunction.java | 14 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 6 +- src/java/org/apache/cassandra/db/rows/Row.java | 13 +- .../cassandra/db/rows/UnfilteredSerializer.java | 5 + .../apache/cassandra/db/transform/Filter.java | 8 +- .../db/transform/FilteredPartitions.java | 4 +- .../cassandra/db/transform/FilteredRows.java | 2 +- .../apache/cassandra/db/view/TableViews.java | 18 +- src/java/org/apache/cassandra/db/view/View.java | 41 +- .../apache/cassandra/db/view/ViewManager.java | 5 + .../cassandra/db/view/ViewUpdateGenerator.java | 163 ++- .../apache/cassandra/service/DataResolver.java | 4 +- .../org/apache/cassandra/cql3/CQLTester.java | 2 +- .../apache/cassandra/cql3/ViewComplexTest.java | 1344 ++++++++++++++++++ .../cassandra/cql3/ViewFilteringTest.java | 1030 +++++++++----- .../org/apache/cassandra/cql3/ViewTest.java | 25 +- 25 files changed, 2293 insertions(+), 495 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/NEWS.txt ---------------------------------------------------------------------- diff --cc NEWS.txt index 8e39667,7064c5d..0682ae9 --- a/NEWS.txt +++ b/NEWS.txt @@@ -18,10 -18,28 +18,34 @@@ using the provided 'sstableupgrade' too Upgrading --------- - - Nothing specific to this release, but please see previous upgrading sections, - especially if you are upgrading from 2.2. + - Nothing specific to this version but please see previous upgrading sections, + especially if you are upgrading from 2.2. + Materialized Views + ------------------- + - Cassandra will no longer allow dropping columns on tables with Materialized Views. + - A change was made in the way the Materialized View timestamp is computed, which + may cause an old deletion to a base column which is view primary key (PK) column + to not be reflected in the view when repairing the base table post-upgrade. This + condition is only possible when a column deletion to an MV primary key (PK) column + not present in the base table PK (via UPDATE base SET view_pk_col = null or DELETE + view_pk_col FROM base) is missed before the upgrade and received by repair after the upgrade. + If such column deletions are done on a view PK column which is not a base PK, it's advisable + to run repair on the base table of all nodes prior to the upgrade. Alternatively it's possible + to fix potential inconsistencies by running repair on the views after upgrade or drop and + re-create the views. See CASSANDRA-11500 for more details. + - Removal of columns not selected in the Materialized View (via UPDATE base SET unselected_column + = null or DELETE unselected_column FROM base) may not be properly reflected in the view in some + situations so we advise against doing deletions on base columns not selected in views + until this is fixed on CASSANDRA-13826. - -3.0.14 ++ - Creating Materialized View with filtering on non-primary-key base column ++ (added in CASSANDRA-10368) is disabled, because the liveness of view row ++ is depending on multiple filtered base non-key columns and base non-key ++ column used in view primary-key. This semantic cannot be supported without ++ storage format change, see CASSANDRA-13826. For append-only use case, you ++ may still use this feature with a startup flag: "-Dcassandra.mv.allow_filtering_nonkey_columns_unsafe=true" ++ +3.11.0 ====== Upgrading http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/doc/cql3/CQL.textile ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java index 3ac790b,befdd25..fdbcf7a --- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java @@@ -187,72 -189,54 +187,60 @@@ public class AlterTableStatement extend break; case DROP: - assert columnName != null; if (!cfm.isCQLTable()) throw new InvalidRequestException("Cannot drop columns from a non-CQL3 table"); - if (def == null) - throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily())); - switch (def.kind) + for (AlterTableStatementColumn colData : colNameList) { - case PARTITION_KEY: - case CLUSTERING: - throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName)); - case REGULAR: - case STATIC: - ColumnDefinition toDelete = null; - for (ColumnDefinition columnDef : cfm.partitionColumns()) - { - if (columnDef.name.equals(columnName)) - { - toDelete = columnDef; - break; - } - } - assert toDelete != null; - cfm.removeColumnDefinition(toDelete); - cfm.recordColumnDrop(toDelete, deleteTimestamp == null ? queryState.getTimestamp() : deleteTimestamp); - break; - } + columnName = colData.getColumnName().getIdentifier(cfm); + def = cfm.getColumnDefinition(columnName); - // If the dropped column is required by any secondary indexes - // we reject the operation, as the indexes must be dropped first - Indexes allIndexes = cfm.getIndexes(); - if (!allIndexes.isEmpty()) - { - ColumnFamilyStore store = Keyspace.openAndGetStore(cfm); - Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def); - if (!dependentIndexes.isEmpty()) - throw new InvalidRequestException(String.format("Cannot drop column %s because it has " + - "dependent secondary indexes (%s)", - def, - dependentIndexes.stream() - .map(i -> i.name) - .collect(Collectors.joining(",")))); - } + if (def == null) + throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily())); + + switch (def.kind) + { + case PARTITION_KEY: + case CLUSTERING: + throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName)); + case REGULAR: + case STATIC: + ColumnDefinition toDelete = null; + for (ColumnDefinition columnDef : cfm.partitionColumns()) + { + if (columnDef.name.equals(columnName)) + { + toDelete = columnDef; + break; + } + } + assert toDelete != null; + cfm.removeColumnDefinition(toDelete); + cfm.recordColumnDrop(toDelete, deleteTimestamp == null ? queryState.getTimestamp() : deleteTimestamp); + break; + } - if (!Iterables.isEmpty(views)) - throw new InvalidRequestException(String.format("Cannot drop column %s on base table with materialized views.", - columnName.toString(), - keyspace())); + // If the dropped column is required by any secondary indexes + // we reject the operation, as the indexes must be dropped first + Indexes allIndexes = cfm.getIndexes(); + if (!allIndexes.isEmpty()) + { + ColumnFamilyStore store = Keyspace.openAndGetStore(cfm); + Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def); + if (!dependentIndexes.isEmpty()) + throw new InvalidRequestException(String.format("Cannot drop column %s because it has " + + "dependent secondary indexes (%s)", + def, + dependentIndexes.stream() + .map(i -> i.name) + .collect(Collectors.joining(",")))); + } + - // If a column is dropped which is included in a view, we don't allow the drop to take place. - boolean rejectAlter = false; - StringBuilder builder = new StringBuilder(); - for (ViewDefinition view : views) - { - if (!view.includes(columnName)) continue; - if (rejectAlter) - builder.append(','); - rejectAlter = true; - builder.append(view.viewName); - } - if (rejectAlter) - throw new InvalidRequestException(String.format("Cannot drop column %s, depended on by materialized views (%s.{%s})", ++ if (!Iterables.isEmpty(views)) ++ throw new InvalidRequestException(String.format("Cannot drop column %s on base table with materialized views.", + columnName.toString(), - keyspace(), - builder.toString())); ++ keyspace())); + } break; case OPTS: if (attrs == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/LivenessInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/Row.java index 60ee4d4,3bcc220..9a8508b --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@@ -206,19 -213,9 +216,19 @@@ public interface Row extends Unfiltered * @return this row but without any deletion info purged by {@code purger}. If the purged row is empty, returns * {@code null}. */ - public Row purge(DeletionPurger purger, int nowInSec); + public Row purge(DeletionPurger purger, int nowInSec, boolean enforceStrictLiveness); /** + * Returns a copy of this row which only include the data queried by {@code filter}, excluding anything _fetched_ for + * internal reasons but not queried by the user (see {@link ColumnFilter} for details). + * + * @param filter the {@code ColumnFilter} to use when deciding what is user queried. This should be the filter + * that was used when querying the row on which this method is called. + * @return the row but with all data that wasn't queried by the user skipped. + */ + public Row withOnlyQueriedData(ColumnFilter filter); + + /** * Returns a copy of this row where all counter cells have they "local" shard marked for clearing. */ public Row markCounterLocalToBeCleared(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/view/ViewManager.java index 5c72bb2,d1cfd9e..cb1e02b --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@@ -172,9 -159,13 +172,14 @@@ public class ViewManage forTable(view.getDefinition().baseTableMetadata()).removeByName(name); SystemKeyspace.setViewRemoved(keyspace.getName(), view.name); + SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name); } + public View getByName(String name) + { + return viewsByName.get(name); + } + public void buildAllViews() { for (View view : allViews()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index edbda1c,0c8e078..74758e7 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@@ -376,14 -384,34 +383,34 @@@ public class ViewUpdateGenerato if (!matchesViewFilter(existingBaseRow)) return; - deleteOldEntryInternal(existingBaseRow); + deleteOldEntryInternal(existingBaseRow, mergedBaseRow); } - private void deleteOldEntryInternal(Row existingBaseRow) + private void deleteOldEntryInternal(Row existingBaseRow, Row mergedBaseRow) { startNewUpdate(existingBaseRow); - DeletionTime dt = new DeletionTime(computeTimestampForEntryDeletion(existingBaseRow), nowInSec); - currentViewEntryBuilder.addRowDeletion(Row.Deletion.shadowable(dt)); + long timestamp = computeTimestampForEntryDeletion(existingBaseRow, mergedBaseRow); + long rowDeletion = mergedBaseRow.deletion().time().markedForDeleteAt(); + assert timestamp >= rowDeletion; + + // If computed deletion timestamp greater than row deletion, it must be coming from + // 1. non-pk base column used in view pk, or + // 2. unselected base column + // any case, we need to use it as expired livenessInfo + // If computed deletion timestamp is from row deletion, we only need row deletion itself + if (timestamp > rowDeletion) + { + /** + * TODO: This is a hack and overload of LivenessInfo and we should probably modify + * the storage engine to properly support this, but on the meantime this + * should be fine because it only happens in some specific scenarios explained above. + */ - LivenessInfo info = LivenessInfo.create(timestamp, Integer.MAX_VALUE, nowInSec); ++ LivenessInfo info = LivenessInfo.withExpirationTime(timestamp, Integer.MAX_VALUE, nowInSec); + currentViewEntryBuilder.addPrimaryKeyLivenessInfo(info); + } + currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion()); + + addDifferentCells(existingBaseRow, mergedBaseRow); submitUpdate(); } @@@ -440,50 -458,74 +457,74 @@@ LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo(); - if (view.baseNonPKColumnsInViewPK.isEmpty()) + if (view.hasSamePrimaryKeyColumnsAsBaseTable()) { - int ttl = baseLiveness.ttl(); - int expirationTime = baseLiveness.localExpirationTime(); + if (view.getDefinition().includeAllColumns) + return baseLiveness; + + long timestamp = baseLiveness.timestamp(); + boolean hasNonExpiringLiveCell = false; + Cell biggestExpirationCell = null; for (Cell cell : baseRow.cells()) { - if (cell.ttl() > ttl) + if (view.getViewColumn(cell.column()) != null) + continue; + if (!isLive(cell)) + continue; + timestamp = Math.max(timestamp, cell.maxTimestamp()); + if (!cell.isExpiring()) + hasNonExpiringLiveCell = true; + else { - ttl = cell.ttl(); - expirationTime = cell.localDeletionTime(); + if (biggestExpirationCell == null) + biggestExpirationCell = cell; + else if (cell.localDeletionTime() > biggestExpirationCell.localDeletionTime()) + biggestExpirationCell = cell; } } - return ttl == baseLiveness.ttl() - ? baseLiveness - : LivenessInfo.withExpirationTime(baseLiveness.timestamp(), ttl, expirationTime); + if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring()) - return LivenessInfo.create(viewMetadata, timestamp, nowInSec); ++ return LivenessInfo.create(timestamp, nowInSec); + if (hasNonExpiringLiveCell) - return LivenessInfo.create(viewMetadata, timestamp, nowInSec); ++ return LivenessInfo.create(timestamp, nowInSec); + if (biggestExpirationCell == null) + return baseLiveness; + if (biggestExpirationCell.localDeletionTime() > baseLiveness.localExpirationTime() + || !baseLiveness.isLive(nowInSec)) - return LivenessInfo.create(timestamp, ++ return LivenessInfo.withExpirationTime(timestamp, + biggestExpirationCell.ttl(), + biggestExpirationCell.localDeletionTime()); + return baseLiveness; } - ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); - Cell cell = baseRow.getCell(baseColumn); + Cell cell = baseRow.getCell(view.baseNonPKColumnsInViewPK.get(0)); assert isLive(cell) : "We shouldn't have got there if the base row had no associated entry"; - long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp()); - return LivenessInfo.withExpirationTime(timestamp, cell.ttl(), cell.localDeletionTime()); - return LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()); ++ return LivenessInfo.withExpirationTime(cell.timestamp(), cell.ttl(), cell.localDeletionTime()); } - private long computeTimestampForEntryDeletion(Row baseRow) + private long computeTimestampForEntryDeletion(Row existingBaseRow, Row mergedBaseRow) { - // We delete the old row with it's row entry timestamp using a shadowable deletion. - // We must make sure that the deletion deletes everything in the entry (or the entry will - // still show up), so we must use the bigger timestamp found in the existing row (for any - // column included in the view at least). - // TODO: We have a problem though: if the entry is "resurected" by a later update, we would - // need to ensure that the timestamp for then entry then is bigger than the tombstone - // we're just inserting, which is not currently guaranteed. - // This is a bug for a separate ticket though. - long timestamp = baseRow.primaryKeyLivenessInfo().timestamp(); - for (ColumnData data : baseRow) + DeletionTime deletion = mergedBaseRow.deletion().time(); + if (view.hasSamePrimaryKeyColumnsAsBaseTable()) { - if (!view.getDefinition().includes(data.column().name)) - continue; + long timestamp = Math.max(deletion.markedForDeleteAt(), existingBaseRow.primaryKeyLivenessInfo().timestamp()); + if (view.getDefinition().includeAllColumns) + return timestamp; - timestamp = Math.max(timestamp, data.maxTimestamp()); + for (Cell cell : existingBaseRow.cells()) + { + // selected column should not contribute to view deletion, itself is already included in view row + if (view.getViewColumn(cell.column()) != null) + continue; + // unselected column is used regardless live or dead, because we don't know if it was used for liveness. + timestamp = Math.max(timestamp, cell.maxTimestamp()); + } + return timestamp; } - return timestamp; + // has base non-pk column in view pk + Cell before = existingBaseRow.getCell(view.baseNonPKColumnsInViewPK.get(0)); + assert isLive(before) : "We shouldn't have got there if the base row had no associated entry"; + return deletion.deletes(before) ? deletion.markedForDeleteAt() : before.timestamp(); } private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e624c663/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/ViewComplexTest.java index 0000000,9e32620..ece3e6d mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java @@@ -1,0 -1,1343 +1,1344 @@@ + package org.apache.cassandra.cql3; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + import static org.junit.Assert.fail; + + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Comparator; + import java.util.HashMap; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.concurrent.TimeUnit; + import java.util.stream.Collectors; + + import org.apache.cassandra.concurrent.SEPExecutor; + import org.apache.cassandra.concurrent.Stage; + import org.apache.cassandra.concurrent.StageManager; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.compaction.CompactionManager; ++import org.apache.cassandra.transport.ProtocolVersion; + import org.apache.cassandra.utils.FBUtilities; + import org.junit.After; + import org.junit.Before; + import org.junit.BeforeClass; + import org.junit.Ignore; + import org.junit.Test; + + import com.google.common.base.Objects; + + public class ViewComplexTest extends CQLTester + { - int protocolVersion = 4; ++ ProtocolVersion protocolVersion = ProtocolVersion.V4; + private final List<String> views = new ArrayList<>(); + + @BeforeClass + public static void startup() + { + requireNetwork(); + } + @Before + public void begin() + { + views.clear(); + } + + @After + public void end() throws Throwable + { + for (String viewName : views) + executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + viewName); + } + + private void createView(String name, String query) throws Throwable + { + executeNet(protocolVersion, String.format(query, name)); + // If exception is thrown, the view will not be added to the list; since it shouldn't have been created, this is + // the desired behavior + views.add(name); + } + + private void updateView(String query, Object... params) throws Throwable + { + updateViewWithFlush(query, false, params); + } + + private void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable + { + executeNet(protocolVersion, query, params); + while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0 + && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0)) + { + Thread.sleep(1); + } + if (flush) + Keyspace.open(keyspace()).flush(); + } + + // for now, unselected column cannot be fully supported, SEE CASSANDRA-13826 + @Ignore + @Test + public void testPartialDeleteUnselectedColumn() throws Throwable + { + boolean flush = true; + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY (k, c))"); + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT k,c FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)"); + Keyspace ks = Keyspace.open(keyspace()); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + updateView("UPDATE %s USING TIMESTAMP 10 SET b=1 WHERE k=1 AND c=1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRows(execute("SELECT * from %s"), row(1, 1, null, 1)); + assertRows(execute("SELECT * from mv"), row(1, 1)); + updateView("DELETE b FROM %s USING TIMESTAMP 11 WHERE k=1 AND c=1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertEmpty(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + updateView("UPDATE %s USING TIMESTAMP 1 SET a=1 WHERE k=1 AND c=1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRows(execute("SELECT * from %s"), row(1, 1, 1, null)); + assertRows(execute("SELECT * from mv"), row(1, 1)); + + execute("truncate %s;"); + + // removal generated by unselected column should not shadow PK update with smaller timestamp + updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET a=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, 1, null)); + assertRows(execute("SELECT * from mv"), row(1, 1)); + + updateViewWithFlush("UPDATE %s USING TIMESTAMP 20 SET a=null WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s")); + assertRows(execute("SELECT * from mv")); + + updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 15", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1)); + } + + @Test + public void testPartialDeleteSelectedColumnWithFlush() throws Throwable + { + testPartialDeleteSelectedColumn(true); + } + + @Test + public void testPartialDeleteSelectedColumnWithoutFlush() throws Throwable + { + testPartialDeleteSelectedColumn(false); + } + + private void testPartialDeleteSelectedColumn(boolean flush) throws Throwable + { + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + createTable("CREATE TABLE %s (k int, c int, a int, b int, e int, f int, PRIMARY KEY (k, c))"); + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (k,c)"); + Keyspace ks = Keyspace.open(keyspace()); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + updateViewWithFlush("UPDATE %s USING TIMESTAMP 10 SET b=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, 1)); + + updateViewWithFlush("DELETE b FROM %s USING TIMESTAMP 11 WHERE k=1 AND c=1", flush); + assertEmpty(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + + updateViewWithFlush("UPDATE %s USING TIMESTAMP 1 SET a=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, 1, null, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, 1, null)); + + updateViewWithFlush("DELETE a FROM %s USING TIMESTAMP 1 WHERE k=1 AND c=1", flush); + assertEmpty(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + + // view livenessInfo should not be affected by selected column ts or tb + updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 0", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateViewWithFlush("UPDATE %s USING TIMESTAMP 12 SET b=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, 1)); + + updateViewWithFlush("DELETE b FROM %s USING TIMESTAMP 13 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateViewWithFlush("DELETE FROM %s USING TIMESTAMP 14 WHERE k=1 AND c=1", flush); + assertEmpty(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + + updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TIMESTAMP 15", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateViewWithFlush("UPDATE %s USING TTL 3 SET b=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, 1, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, 1)); + + TimeUnit.SECONDS.sleep(4); + + assertRows(execute("SELECT * from %s"), row(1, 1, null, null, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateViewWithFlush("DELETE FROM %s USING TIMESTAMP 15 WHERE k=1 AND c=1", flush); + assertEmpty(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + + execute("truncate %s;"); + + // removal generated by unselected column should not shadow selected column with smaller timestamp + updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET e=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, null, null, 1, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateViewWithFlush("UPDATE %s USING TIMESTAMP 18 SET e=null WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s")); + assertRows(execute("SELECT * from mv")); + + updateViewWithFlush("UPDATE %s USING TIMESTAMP 16 SET a=1 WHERE k=1 AND c=1", flush); + assertRows(execute("SELECT * from %s"), row(1, 1, 1, null, null, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, 1, null)); + } + + @Test + public void testUpdateColumnInViewPKWithTTLWithFlush() throws Throwable + { + // CASSANDRA-13657 + testUpdateColumnInViewPKWithTTL(true); + } + + @Test + public void testUpdateColumnInViewPKWithTTLWithoutFlush() throws Throwable + { + // CASSANDRA-13657 + testUpdateColumnInViewPKWithTTL(false); + } + + private void testUpdateColumnInViewPKWithTTL(boolean flush) throws Throwable + { + // CASSANDRA-13657 if base column used in view pk is ttled, then view row is considered dead + createTable("create table %s (k int primary key, a int, b int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + updateView("UPDATE %s SET a = 1 WHERE k = 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT * from %s"), row(1, 1, null)); + assertRows(execute("SELECT * from mv"), row(1, 1, null)); + + updateView("DELETE a FROM %s WHERE k = 1"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + + updateView("INSERT INTO %s (k) VALUES (1);"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT * from %s"), row(1, null, null)); + assertEmpty(execute("SELECT * from mv")); + + updateView("UPDATE %s USING TTL 5 SET a = 10 WHERE k = 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT * from %s"), row(1, 10, null)); + assertRows(execute("SELECT * from mv"), row(10, 1, null)); + + updateView("UPDATE %s SET b = 100 WHERE k = 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT * from %s"), row(1, 10, 100)); + assertRows(execute("SELECT * from mv"), row(10, 1, 100)); + + Thread.sleep(5000); + + // 'a' is TTL of 5 and removed. + assertRows(execute("SELECT * from %s"), row(1, null, 100)); + assertEmpty(execute("SELECT * from mv")); + assertEmpty(execute("SELECT * from mv WHERE k = ? AND a = ?", 1, 10)); + + updateView("DELETE b FROM %s WHERE k=1"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT * from %s"), row(1, null, null)); + assertEmpty(execute("SELECT * from mv")); + + updateView("DELETE FROM %s WHERE k=1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertEmpty(execute("SELECT * from %s")); + assertEmpty(execute("SELECT * from mv")); + } + + @Test + public void testUpdateColumnNotInViewWithFlush() throws Throwable + { + testUpdateColumnNotInView(true); + } + + @Test + public void testUpdateColumnNotInViewWithoutFlush() throws Throwable + { + // CASSANDRA-13127 + testUpdateColumnNotInView(false); + } + + private void testUpdateColumnNotInView(boolean flush) throws Throwable + { + // CASSANDRA-13127: if base column not selected in view are alive, then pk of view row should be alive + createTable("create table %s (p int, c int, v1 int, v2 int, primary key(p, c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT p, c FROM %%s WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + updateView("UPDATE %s USING TIMESTAMP 0 SET v1 = 1 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, 1, null)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + updateView("DELETE v1 FROM %s USING TIMESTAMP 1 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0)); + + // shadowed by tombstone + updateView("UPDATE %s USING TIMESTAMP 1 SET v1 = 1 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0)); + + updateView("UPDATE %s USING TIMESTAMP 2 SET v2 = 1 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + updateView("DELETE v1 FROM %s USING TIMESTAMP 3 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + updateView("DELETE v2 FROM %s USING TIMESTAMP 4 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + assertEmpty(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0)); + + updateView("UPDATE %s USING TTL 3 SET v2 = 1 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + Thread.sleep(TimeUnit.SECONDS.toMillis(3)); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0)); + + updateView("UPDATE %s SET v2 = 1 WHERE p = 0 AND c = 0"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0), row(0, 0, null, 1)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + assertInvalidMessage("Cannot drop column v2 on base table with materialized views", "ALTER TABLE %s DROP v2"); + // // drop unselected base column, unselected metadata should be removed, thus view row is dead + // updateView("ALTER TABLE %s DROP v2"); + // assertRowsIgnoringOrder(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + // assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0)); + // assertRowsIgnoringOrder(execute("SELECT * from %s")); + // assertRowsIgnoringOrder(execute("SELECT * from mv")); + } + + @Test + public void testPartialUpdateWithUnselectedCollectionsWithFlush() throws Throwable + { + testPartialUpdateWithUnselectedCollections(true); + } + + @Test + public void testPartialUpdateWithUnselectedCollectionsWithoutFlush() throws Throwable + { + testPartialUpdateWithUnselectedCollections(false); + } + + public void testPartialUpdateWithUnselectedCollections(boolean flush) throws Throwable + { + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + createTable("CREATE TABLE %s (k int, c int, a int, b int, l list<int>, s set<int>, m map<int,int>, PRIMARY KEY (k, c))"); + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT a, b FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)"); + Keyspace ks = Keyspace.open(keyspace()); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + updateView("UPDATE %s SET l=l+[1,2,3] WHERE k = 1 AND c = 1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateView("UPDATE %s SET l=l-[1,2] WHERE k = 1 AND c = 1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRows(execute("SELECT * from mv"), row(1, 1, null, null)); + + updateView("UPDATE %s SET b=3 WHERE k=1 AND c=1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRows(execute("SELECT * from mv"), row(1, 1, null, 3)); + + updateView("UPDATE %s SET b=null, l=l-[3], s=s-{3} WHERE k = 1 AND c = 1"); + if (flush) + { + FBUtilities.waitOnFutures(ks.flush()); + ks.getColumnFamilyStore("mv").forceMajorCompaction(); + } + assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s")); + assertRowsIgnoringOrder(execute("SELECT * from mv")); + + updateView("UPDATE %s SET m=m+{3:3}, l=l-[1], s=s-{2} WHERE k = 1 AND c = 1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s"), row(1, 1, null, null)); + assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 1, null, null)); + + assertInvalidMessage("Cannot drop column m on base table with materialized views", "ALTER TABLE %s DROP m"); + // executeNet(protocolVersion, "ALTER TABLE %s DROP m"); + // ks.getColumnFamilyStore("mv").forceMajorCompaction(); + // assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s WHERE k = 1 AND c = 1")); + // assertRowsIgnoringOrder(execute("SELECT * from mv WHERE k = 1 AND c = 1")); + // assertRowsIgnoringOrder(execute("SELECT k,c,a,b from %s")); + // assertRowsIgnoringOrder(execute("SELECT * from mv")); + } + + @Test + public void testUnselectedColumnsTTLWithFlush() throws Throwable + { + // CASSANDRA-13127 + testUnselectedColumnsTTL(true); + } + + @Test + public void testUnselectedColumnsTTLWithoutFlush() throws Throwable + { + // CASSANDRA-13127 + testUnselectedColumnsTTL(false); + } + + private void testUnselectedColumnsTTL(boolean flush) throws Throwable + { + // CASSANDRA-13127 not ttled unselected column in base should keep view row alive + createTable("create table %s (p int, c int, v int, primary key(p, c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT p, c FROM %%s WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + updateViewWithFlush("INSERT INTO %s (p, c) VALUES (0, 0) USING TTL 3;", flush); + + updateViewWithFlush("UPDATE %s USING TTL 1000 SET v = 0 WHERE p = 0 and c = 0;", flush); + + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + Thread.sleep(3000); + + UntypedResultSet.Row row = execute("SELECT v, ttl(v) from %s WHERE c = ? AND p = ?", 0, 0).one(); + assertTrue("row should have value of 0", row.getInt("v") == 0); + assertTrue("row should have ttl less than 1000", row.getInt("ttl(v)") < 1000); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + updateViewWithFlush("DELETE FROM %s WHERE p = 0 and c = 0;", flush); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0)); + + updateViewWithFlush("INSERT INTO %s (p, c) VALUES (0, 0) ", flush); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + // already have a live row, no need to apply the unselected cell ttl + updateViewWithFlush("UPDATE %s USING TTL 3 SET v = 0 WHERE p = 0 and c = 0;", flush); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + + updateViewWithFlush("INSERT INTO %s (p, c) VALUES (1, 1) USING TTL 3", flush); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 1, 1), row(1, 1)); + + Thread.sleep(4000); + + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 0, 0), row(0, 0)); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 1, 1)); + + // unselected should keep view row alive + updateViewWithFlush("UPDATE %s SET v = 0 WHERE p = 1 and c = 1;", flush); + assertRowsIgnoringOrder(execute("SELECT * from mv WHERE c = ? AND p = ?", 1, 1), row(1, 1)); + + } + + @Test + public void testRangeDeletionWithFlush() throws Throwable + { + testRangeDeletion(true); + } + + @Test + public void testRangeDeletionWithoutFlush() throws Throwable + { + testRangeDeletion(false); + } + + public void testRangeDeletion(boolean flush) throws Throwable + { + // for partition range deletion, need to know that existing row is shadowed instead of not existed. + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv_test1", + "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a, b)"); + + Keyspace ks = Keyspace.open(keyspace()); + ks.getColumnFamilyStore("mv_test1").disableAutoCompaction(); + + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) using timestamp 0", 1, 1, 1, 1); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"), row(1, 1, 1, 1)); + + // remove view row + updateView("UPDATE %s using timestamp 1 set b = null WHERE a=1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1")); + // remove base row, no view updated generated. + updateView("DELETE FROM %s using timestamp 2 where a=1"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1")); + + // restor view row with b,c column. d is still tombstone + updateView("UPDATE %s using timestamp 3 set b = 1,c = 1 where a=1"); // upsert + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT * FROM mv_test1"), row(1, 1, 1, null)); + } + + @Test + public void testBaseTTLWithSameTimestampTest() throws Throwable + { + // CASSANDRA-13127 when liveness timestamp tie, greater localDeletionTime should win if both are expiring. + createTable("create table %s (p int, c int, v int, primary key(p, c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) using timestamp 1;"); + + FBUtilities.waitOnFutures(ks.flush()); + + updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING TTL 3 and timestamp 1;"); + + FBUtilities.waitOnFutures(ks.flush()); + + Thread.sleep(4000); + + assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + + // reversed order + execute("truncate %s;"); + + updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING TTL 3 and timestamp 1;"); + + FBUtilities.waitOnFutures(ks.flush()); + + updateView("INSERT INTO %s (p, c, v) VALUES (0, 0, 0) USING timestamp 1;"); + + FBUtilities.waitOnFutures(ks.flush()); + + Thread.sleep(4000); + + assertEmpty(execute("SELECT * from %s WHERE c = ? AND p = ?", 0, 0)); + + } + + @Test + public void testCommutativeRowDeletionFlush() throws Throwable + { + // CASSANDRA-13409 + testCommutativeRowDeletion(true); + } + + @Test + public void testCommutativeRowDeletionWithoutFlush() throws Throwable + { + // CASSANDRA-13409 + testCommutativeRowDeletion(false); + } + + private void testCommutativeRowDeletion(boolean flush) throws Throwable + { + // CASSANDRA-13409 new update should not resurrect previous deleted data in view + createTable("create table %s (p int primary key, v1 int, v2 int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + // sstable-1, Set initial values TS=1 + updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using timestamp 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), row(3, 1L)); + // sstable-2 + updateView("Delete from %s using timestamp 2 where p = 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv")); + // sstable-3 + updateView("Insert into %s (p, v1) values (3, 1) using timestamp 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null)); + // sstable-4 + updateView("UPdate %s using timestamp 4 set v1 = 2 where p = 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, null, null)); + // sstable-5 + updateView("UPdate %s using timestamp 5 set v1 = 1 where p = 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null)); + + if (flush) + { + // compact sstable 2 and 4, 5; + ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv"); + List<String> sstables = cfs.getLiveSSTables() + .stream() + .sorted((s1, s2) -> s1.descriptor.generation - s2.descriptor.generation) + .map(s -> s.getFilename()) + .collect(Collectors.toList()); + String dataFiles = String.join(",", Arrays.asList(sstables.get(1), sstables.get(3), sstables.get(4))); + CompactionManager.instance.forceUserDefinedCompaction(dataFiles); + assertEquals(3, cfs.getLiveSSTables().size()); + } + // regular tombstone should be retained after compaction + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null)); + } + + @Test + public void testUnselectedColumnWithExpiredLivenessInfo() throws Throwable + { + boolean flush = true; + createTable("create table %s (k int, c int, a int, b int, PRIMARY KEY(k, c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select k,c,b from %%s where c is not null and k is not null primary key (c, k);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + // sstable-1, Set initial values TS=1 + updateViewWithFlush("UPDATE %s SET a = 1 WHERE k = 1 AND c = 1;", flush); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"), + row(1, 1, 1, null)); + assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"), + row(1, 1, null)); + + // sstable-2 + updateViewWithFlush("INSERT INTO %s(k,c) VALUES(1,1) USING TTL 5", flush); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"), + row(1, 1, 1, null)); + assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"), + row(1, 1, null)); + + Thread.sleep(5001); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"), + row(1, 1, 1, null)); + assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"), + row(1, 1, null)); + + // sstable-3 + updateViewWithFlush("Update %s set a = null where k = 1 AND c = 1;", flush); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;")); + assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;")); + + // sstable-4 + updateViewWithFlush("Update %s USING TIMESTAMP 1 set b = 1 where k = 1 AND c = 1;", flush); + + assertRowsIgnoringOrder(execute("SELECT * from %s WHERE k = 1 AND c = 1;"), + row(1, 1, null, 1)); + assertRowsIgnoringOrder(execute("SELECT k,c,b from mv WHERE k = 1 AND c = 1;"), + row(1, 1, 1)); + } + + @Test + public void testUpdateWithColumnTimestampSmallerThanPkWithFlush() throws Throwable + { + testUpdateWithColumnTimestampSmallerThanPk(true); + } + + @Test + public void testUpdateWithColumnTimestampSmallerThanPkWithoutFlush() throws Throwable + { + testUpdateWithColumnTimestampSmallerThanPk(false); + } + + public void testUpdateWithColumnTimestampSmallerThanPk(boolean flush) throws Throwable + { + createTable("create table %s (p int primary key, v1 int, v2 int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + // reset value + updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using timestamp 6;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 3, 6L)); + // increase pk's timestamp to 20 + updateView("Insert into %s (p) values (3) using timestamp 20;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 3, 6L)); + // change v1's to 2 and remove existing view row with ts7 + updateView("UPdate %s using timestamp 7 set v1 = 2 where p = 3;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, 3, 6L)); + // change v1's to 1 and remove existing view row with ts8 + updateView("UPdate %s using timestamp 8 set v1 = 1 where p = 3;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 3, 6L)); + } + + @Test + public void testUpdateWithColumnTimestampBiggerThanPkWithFlush() throws Throwable + { + // CASSANDRA-11500 + testUpdateWithColumnTimestampBiggerThanPk(true); + } + + @Test + public void testUpdateWithColumnTimestampBiggerThanPkWithoutFlush() throws Throwable + { + // CASSANDRA-11500 + testUpdateWithColumnTimestampBiggerThanPk(false); + } + + public void testUpdateWithColumnTimestampBiggerThanPk(boolean flush) throws Throwable + { + // CASSANDRA-11500 able to shadow old view row with column ts greater tahn pk's ts and re-insert the view row + createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int);"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + updateView("DELETE FROM %s USING TIMESTAMP 0 WHERE k = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + // sstable-1, Set initial values TS=1 + updateView("INSERT INTO %s(k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 1)); + updateView("UPDATE %s USING TIMESTAMP 10 SET b = 2 WHERE k = 1;"); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2)); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2)); + updateView("UPDATE %s USING TIMESTAMP 2 SET a = 2 WHERE k = 1;"); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 2)); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + ks.getColumnFamilyStore("mv").forceMajorCompaction(); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 2)); + updateView("UPDATE %s USING TIMESTAMP 11 SET a = 1 WHERE k = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2)); + assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, 1, 2)); + + // set non-key base column as tombstone, view row is removed with shadowable + updateView("UPDATE %s USING TIMESTAMP 12 SET a = null WHERE k = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv")); + assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, null, 2)); + + // column b should be alive + updateView("UPDATE %s USING TIMESTAMP 13 SET a = 1 WHERE k = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 1, 2)); + assertRowsIgnoringOrder(execute("SELECT k,a,b from %s"), row(1, 1, 2)); + + assertInvalidMessage("Cannot drop column a on base table with materialized views", "ALTER TABLE %s DROP a"); + } + + @Test + public void testNonBaseColumnInViewPkWithFlush() throws Throwable + { + testNonBaseColumnInViewPk(true); + } + + @Test + public void testNonBaseColumnInViewPkWithoutFlush() throws Throwable + { + testNonBaseColumnInViewPk(true); + } + + public void testNonBaseColumnInViewPk(boolean flush) throws Throwable + { + createTable("create table %s (p1 int, p2 int, v1 int, v2 int, primary key (p1,p2))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select * from %%s where p1 is not null and p2 is not null primary key (p2, p1)" + + " with gc_grace_seconds=5;"); + ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv"); + cfs.disableAutoCompaction(); + + updateView("UPDATE %s USING TIMESTAMP 1 set v1 =1 where p1 = 1 AND p2 = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, 1, null)); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, 1, null)); + + updateView("UPDATE %s USING TIMESTAMP 2 set v1 = null, v2 = 1 where p1 = 1 AND p2 = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, null, 1)); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, null, 1)); + + updateView("UPDATE %s USING TIMESTAMP 2 set v2 = null where p1 = 1 AND p2 = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s")); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv")); + + updateView("INSERT INTO %s (p1,p2) VALUES(1,1) USING TIMESTAMP 3;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, null, null)); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, null, null)); + + updateView("DELETE FROM %s USING TIMESTAMP 4 WHERE p1 =1 AND p2 = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s")); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv")); + + updateView("UPDATE %s USING TIMESTAMP 5 set v2 = 1 where p1 = 1 AND p2 = 1;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from %s"), row(1, 1, null, 1)); + assertRowsIgnoringOrder(execute("SELECT p1, p2, v1, v2 from mv"), row(1, 1, null, 1)); + } + + @Test + public void testStrictLivenessTombstone() throws Throwable + { + createTable("create table %s (p int primary key, v1 int, v2 int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p)" + + " with gc_grace_seconds=5;"); + ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv"); + cfs.disableAutoCompaction(); + + updateView("Insert into %s (p, v1, v2) values (1, 1, 1) ;"); + assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 1, 1)); + + updateView("Update %s set v1 = null WHERE p = 1"); + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv")); + + cfs.forceMajorCompaction(); // before gc grace second, strict-liveness tombstoned dead row remains + assertEquals(1, cfs.getLiveSSTables().size()); + + Thread.sleep(6000); + assertEquals(1, cfs.getLiveSSTables().size()); // no auto compaction. + + cfs.forceMajorCompaction(); // after gc grace second, no data left + assertEquals(0, cfs.getLiveSSTables().size()); + + updateView("Update %s using ttl 5 set v1 = 1 WHERE p = 1"); + FBUtilities.waitOnFutures(ks.flush()); + assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 1, 1)); + + cfs.forceMajorCompaction(); // before ttl+gc_grace_second, strict-liveness ttled dead row remains + assertEquals(1, cfs.getLiveSSTables().size()); + assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv"), row(1, 1, 1)); + + Thread.sleep(5500); // after expired, before gc_grace_second + cfs.forceMajorCompaction();// before ttl+gc_grace_second, strict-liveness ttled dead row remains + assertEquals(1, cfs.getLiveSSTables().size()); + assertRowsIgnoringOrder(execute("SELECT p, v1, v2 from mv")); + + Thread.sleep(5500); // after expired + gc_grace_second + assertEquals(1, cfs.getLiveSSTables().size()); // no auto compaction. + + cfs.forceMajorCompaction(); // after gc grace second, no data left + assertEquals(0, cfs.getLiveSSTables().size()); + } + + @Test + public void testCellTombstoneAndShadowableTombstonesWithFlush() throws Throwable + { + testCellTombstoneAndShadowableTombstones(true); + } + + @Test + public void testCellTombstoneAndShadowableTombstonesWithoutFlush() throws Throwable + { + testCellTombstoneAndShadowableTombstones(false); + } + + private void testCellTombstoneAndShadowableTombstones(boolean flush) throws Throwable + { + createTable("create table %s (p int primary key, v1 int, v2 int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + // sstable 1, Set initial values TS=1 + updateView("Insert into %s (p, v1, v2) values (3, 1, 3) using timestamp 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), row(3, 1L)); + // sstable 2 + updateView("UPdate %s using timestamp 2 set v2 = null where p = 3"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), + row(null, null)); + // sstable 3 + updateView("UPdate %s using timestamp 3 set v1 = 2 where p = 3"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, null, null)); + // sstable 4 + updateView("UPdate %s using timestamp 4 set v1 = 1 where p = 3"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null)); + + if (flush) + { + // compact sstable 2 and 3; + ColumnFamilyStore cfs = ks.getColumnFamilyStore("mv"); + List<String> sstables = cfs.getLiveSSTables() + .stream() + .sorted(Comparator.comparingInt(s -> s.descriptor.generation)) + .map(s -> s.getFilename()) + .collect(Collectors.toList()); + System.out.println("SSTables " + sstables); + String dataFiles = String.join(",", Arrays.asList(sstables.get(1), sstables.get(2))); + CompactionManager.instance.forceUserDefinedCompaction(dataFiles); + } + // cell-tombstone in sstable 4 is not compacted away, because the shadowable tombstone is shadowed by new row. + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null)); + } + + @Test + public void complexTimestampDeletionTestWithFlush() throws Throwable + { + complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(true); + complexTimestampWithbasePKColumnsInViewPKDeletionTest(true); + } + + @Test + public void complexTimestampDeletionTestWithoutFlush() throws Throwable + { + complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(false); + complexTimestampWithbasePKColumnsInViewPKDeletionTest(false); + } + + private void complexTimestampWithbasePKColumnsInViewPKDeletionTest(boolean flush) throws Throwable + { + createTable("create table %s (p1 int, p2 int, v1 int, v2 int, primary key(p1, p2))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv2", + "create materialized view %s as select * from %%s where p1 is not null and p2 is not null primary key (p2, p1);"); + ks.getColumnFamilyStore("mv2").disableAutoCompaction(); + + // Set initial values TS=1 + updateView("Insert into %s (p1, p2, v1, v2) values (1, 2, 3, 4) using timestamp 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2), + row(3, 4, 1L)); + // remove row/mv TS=2 + updateView("Delete from %s using timestamp 2 where p1 = 1 and p2 = 2;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + // view are empty + assertRowsIgnoringOrder(execute("SELECT * from mv2")); + // insert PK with TS=3 + updateView("Insert into %s (p1, p2) values (1, 2) using timestamp 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + // deleted column in MV remained dead + assertRowsIgnoringOrder(execute("SELECT * from mv2"), row(2, 1, null, null)); + + ks.getColumnFamilyStore("mv2").forceMajorCompaction(); + assertRowsIgnoringOrder(execute("SELECT * from mv2"), row(2, 1, null, null)); + + // reset values + updateView("Insert into %s (p1, p2, v1, v2) values (1, 2, 3, 4) using timestamp 10;"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2), + row(3, 4, 10L)); + + updateView("UPDATE %s using timestamp 20 SET v2 = 5 WHERE p1 = 1 and p2 = 2"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2), + row(3, 5, 20L)); + + updateView("DELETE FROM %s using timestamp 10 WHERE p1 = 1 and p2 = 2"); + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v1, v2, WRITETIME(v2) from mv2 WHERE p1 = ? AND p2 = ?", 1, 2), + row(null, 5, 20L)); + } + + public void complexTimestampWithbaseNonPKColumnsInViewPKDeletionTest(boolean flush) throws Throwable + { + createTable("create table %s (p int primary key, v1 int, v2 int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", + "create materialized view %s as select * from %%s where p is not null and v1 is not null primary key (v1, p);"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + // Set initial values TS=1 + updateView("Insert into %s (p, v1, v2) values (3, 1, 5) using timestamp 1;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRowsIgnoringOrder(execute("SELECT v2, WRITETIME(v2) from mv WHERE v1 = ? AND p = ?", 1, 3), row(5, 1L)); + // remove row/mv TS=2 + updateView("Delete from %s using timestamp 2 where p = 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + // view are empty + assertRowsIgnoringOrder(execute("SELECT * from mv")); + // insert PK with TS=3 + updateView("Insert into %s (p, v1) values (3, 1) using timestamp 3;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + // deleted column in MV remained dead + assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null)); + + // insert values TS=2, it should be considered dead due to previous tombstone + updateView("Insert into %s (p, v1, v2) values (3, 1, 5) using timestamp 2;"); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + // deleted column in MV remained dead + assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null)); + + // insert values TS=2, it should be considered dead due to previous tombstone + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET v2 = ? WHERE p = ?", 4, 3); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 4, 3L)); + + ks.getColumnFamilyStore("mv").forceMajorCompaction(); + assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 4, 3L)); + } + + @Test + public void testMVWithDifferentColumnsWithFlush() throws Throwable + { + testMVWithDifferentColumns(true); + } + + @Test + public void testMVWithDifferentColumnsWithoutFlush() throws Throwable + { + testMVWithDifferentColumns(false); + } + + private void testMVWithDifferentColumns(boolean flush) throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, f int, PRIMARY KEY(a, b))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + List<String> viewNames = new ArrayList<>(); + List<String> mvStatements = Arrays.asList( + // all selected + "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a,b)", + // unselected e,f + "CREATE MATERIALIZED VIEW %s AS SELECT c,d FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a,b)", + // no selected + "CREATE MATERIALIZED VIEW %s AS SELECT a,b FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (a,b)", + // all selected, re-order keys + "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b,a)", + // unselected e,f, re-order keys + "CREATE MATERIALIZED VIEW %s AS SELECT a,b,c,d FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b,a)", + // no selected, re-order keys + "CREATE MATERIALIZED VIEW %s AS SELECT a,b FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b,a)"); + + Keyspace ks = Keyspace.open(keyspace()); + + for (int i = 0; i < mvStatements.size(); i++) + { + String name = "mv" + i; + viewNames.add(name); + createView(name, mvStatements.get(i)); + ks.getColumnFamilyStore(name).disableAutoCompaction(); + } + + // insert + updateViewWithFlush("INSERT INTO %s (a,b,c,d,e,f) VALUES(1,1,1,1,1,1) using timestamp 1", flush); + assertBaseViews(row(1, 1, 1, 1, 1, 1), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 2 SET c=0, d=0 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, 0, 0, 1, 1), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 2 SET e=0, f=0 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, 0, 0, 0, 0), viewNames); + + updateViewWithFlush("DELETE FROM %s using timestamp 2 WHERE a=1 AND b=1", flush); + assertBaseViews(null, viewNames); + + // partial update unselected, selected + updateViewWithFlush("UPDATE %s using timestamp 3 SET f=1 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, null, null, null, 1), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 4 SET e = 1, f=null WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, null, null, 1, null), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 4 SET e = null WHERE a=1 AND b=1", flush); + assertBaseViews(null, viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 5 SET c = 1 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, 1, null, null, null), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 5 SET c = null WHERE a=1 AND b=1", flush); + assertBaseViews(null, viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 6 SET d = 1 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, null, 1, null, null), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 7 SET d = null WHERE a=1 AND b=1", flush); + assertBaseViews(null, viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 8 SET f = 1 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, null, null, null, 1), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 6 SET c = 1 WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, 1, null, null, 1), viewNames); + + // view row still alive due to c=1@6 + updateViewWithFlush("UPDATE %s using timestamp 8 SET f = null WHERE a=1 AND b=1", flush); + assertBaseViews(row(1, 1, 1, null, null, null), viewNames); + + updateViewWithFlush("UPDATE %s using timestamp 6 SET c = null WHERE a=1 AND b=1", flush); + assertBaseViews(null, viewNames); + } + + private void assertBaseViews(Object[] row, List<String> viewNames) throws Throwable + { + UntypedResultSet result = execute("SELECT * FROM %s"); + if (row == null) + assertRowsIgnoringOrder(result); + else + assertRowsIgnoringOrder(result, row); + for (int i = 0; i < viewNames.size(); i++) + assertBaseView(result, execute(String.format("SELECT * FROM %s", viewNames.get(i))), viewNames.get(i)); + } + + private void assertBaseView(UntypedResultSet base, UntypedResultSet view, String mv) + { + List<ColumnSpecification> baseMeta = base.metadata(); + List<ColumnSpecification> viewMeta = view.metadata(); + + Iterator<UntypedResultSet.Row> iter = base.iterator(); + Iterator<UntypedResultSet.Row> viewIter = view.iterator(); + + List<UntypedResultSet.Row> baseData = com.google.common.collect.Lists.newArrayList(iter); + List<UntypedResultSet.Row> viewData = com.google.common.collect.Lists.newArrayList(viewIter); + + if (baseData.size() != viewData.size()) + fail(String.format("Mismatch number of rows in view %s: <%s>, in base <%s>", + mv, + makeRowStrings(view), + makeRowStrings(base))); + if (baseData.size() == 0) + return; + if (viewData.size() != 1) + fail(String.format("Expect only one row in view %s, but got <%s>", + mv, + makeRowStrings(view))); + + UntypedResultSet.Row row = baseData.get(0); + UntypedResultSet.Row viewRow = viewData.get(0); + + Map<String, ByteBuffer> baseValues = new HashMap<>(); + for (int j = 0; j < baseMeta.size(); j++) + { + ColumnSpecification column = baseMeta.get(j); + ByteBuffer actualValue = row.getBytes(column.name.toString()); + baseValues.put(column.name.toString(), actualValue); + } + for (int j = 0; j < viewMeta.size(); j++) + { + ColumnSpecification column = viewMeta.get(j); + String name = column.name.toString(); + ByteBuffer viewValue = viewRow.getBytes(name); + if (!baseValues.containsKey(name)) + { + fail(String.format("Extra column: %s with value %s in view", name, column.type.compose(viewValue))); + } + else if (!Objects.equal(baseValues.get(name), viewValue)) + { + fail(String.format("Non equal column: %s, expected <%s> but got <%s>", + name, + column.type.compose(baseValues.get(name)), + column.type.compose(viewValue))); + } + } + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org