Refactor MV code patch by slebresne; reviewed by carlyeks for CASSANDRA-11475
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/86ba2274 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/86ba2274 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/86ba2274 Branch: refs/heads/cassandra-3.7 Commit: 86ba227477b9f8595eb610ecaf950cfbc29dd36b Parents: c19066e Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Mar 11 14:19:38 2016 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri May 6 13:41:41 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../org/apache/cassandra/config/CFMetaData.java | 6 + .../apache/cassandra/config/ViewDefinition.java | 1 - .../cql3/statements/CreateViewStatement.java | 4 +- .../cql3/statements/SelectStatement.java | 41 +- .../apache/cassandra/db/ColumnFamilyStore.java | 6 +- src/java/org/apache/cassandra/db/Keyspace.java | 2 +- .../db/SinglePartitionReadCommand.java | 33 + src/java/org/apache/cassandra/db/Slices.java | 7 + .../apache/cassandra/db/filter/RowFilter.java | 24 + .../SingletonUnfilteredPartitionIterator.java | 3 +- .../apache/cassandra/db/rows/AbstractCell.java | 5 + .../org/apache/cassandra/db/rows/BTreeRow.java | 34 +- .../apache/cassandra/db/rows/BufferCell.java | 5 + src/java/org/apache/cassandra/db/rows/Cell.java | 2 + .../apache/cassandra/db/rows/ColumnData.java | 2 + .../cassandra/db/rows/ComplexColumnData.java | 8 + src/java/org/apache/cassandra/db/rows/Row.java | 35 +- .../cassandra/db/rows/RowDiffListener.java | 2 +- .../db/rows/UnfilteredRowIterators.java | 2 +- .../apache/cassandra/db/view/TableViews.java | 481 ++++++++++++++ .../apache/cassandra/db/view/TemporalRow.java | 610 ------------------ src/java/org/apache/cassandra/db/view/View.java | 629 ++----------------- .../apache/cassandra/db/view/ViewBuilder.java | 38 +- .../apache/cassandra/db/view/ViewManager.java | 146 +---- .../cassandra/db/view/ViewUpdateGenerator.java | 549 ++++++++++++++++ .../org/apache/cassandra/cql3/ViewTest.java | 52 +- .../org/apache/cassandra/db/rows/RowsTest.java | 6 +- 28 files changed, 1399 insertions(+), 1337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0679e11..3a49f6a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +3.0.7 + * Refactor Materialized View code (CASSANDRA-11475) + 3.0.6 * Disallow creating view with a static column (CASSANDRA-11602) * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593) http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 79cd779..e263697 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -612,6 +613,11 @@ public final class CFMetaData }; } + public Iterable<ColumnDefinition> primaryKeyColumns() + { + return Iterables.concat(partitionKeyColumns, clusteringColumns); + } + public List<ColumnDefinition> partitionKeyColumns() { return partitionKeyColumns; http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/config/ViewDefinition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/ViewDefinition.java b/src/java/org/apache/cassandra/config/ViewDefinition.java index b29a8f9..5300f56 100644 --- a/src/java/org/apache/cassandra/config/ViewDefinition.java +++ b/src/java/org/apache/cassandra/config/ViewDefinition.java @@ -37,7 +37,6 @@ public class ViewDefinition public final UUID baseTableId; public final String baseTableName; public final boolean includeAllColumns; - // The order of partititon columns and clustering columns is important, so we cannot switch these two to sets public final CFMetaData metadata; public SelectStatement.RawStatement select; http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java index 45231b7..6446602 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java @@ -227,10 +227,10 @@ public class CreateViewStatement extends SchemaAlteringStatement // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used boolean hasNonPKColumn = false; for (ColumnIdentifier.Raw raw : partitionKeys) - hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions); + hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions); for (ColumnIdentifier.Raw raw : clusteringKeys) - hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions); + hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions); // We need to include all of the primary key columns from the base table in order to make sure that we do not // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index c3e13f4..0e33475 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -151,6 +151,15 @@ public class SelectStatement implements CQLStatement return builder.build(); } + /** + * The columns to fetch internally for this SELECT statement (which can be more than the one selected by the + * user as it also include any restricted column in particular). + */ + public ColumnFilter queriedColumns() + { + return queriedColumns; + } + // Creates a simple select based on the given selection. // Note that the results select statement should not be used for actual queries, but only for processing already // queried data through processColumnFamily. @@ -473,7 +482,29 @@ public class SelectStatement implements CQLStatement } /** - * Returns a read command that can be used internally to filter individual rows for materialized views. + * Returns the slices fetched by this SELECT, assuming an internal call (no bound values in particular). + * <p> + * Note that if the SELECT intrinsically selects rows by names, we convert them into equivalent slices for + * the purpose of this method. This is used for MVs to restrict what needs to be read when we want to read + * everything that could be affected by a given view (and so, if the view SELECT statement has restrictions + * on the clustering columns, we can restrict what we read). + */ + public Slices clusteringIndexFilterAsSlices() + { + QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList()); + ClusteringIndexFilter filter = makeClusteringIndexFilter(options); + if (filter instanceof ClusteringIndexSliceFilter) + return ((ClusteringIndexSliceFilter)filter).requestedSlices(); + + Slices.Builder builder = new Slices.Builder(cfm.comparator); + for (Clustering clustering: ((ClusteringIndexNamesFilter)filter).requestedRows()) + builder.add(Slice.make(clustering)); + return builder.build(); + } + + /** + * Returns a read command that can be used internally to query all the rows queried by this SELECT for a + * give key (used for materialized views). */ public SinglePartitionReadCommand internalReadForView(DecoratedKey key, int nowInSec) { @@ -483,6 +514,14 @@ public class SelectStatement implements CQLStatement return SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, filter); } + /** + * The {@code RowFilter} for this SELECT, assuming an internal call (no bound values in particular). + */ + public RowFilter rowFilterForInternalCalls() + { + return getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList())); + } + private ReadQuery getRangeCommand(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException { ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 919fed6..5b8de8f 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -48,7 +48,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; -import org.apache.cassandra.db.view.ViewManager; +import org.apache.cassandra.db.view.TableViews; import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -198,7 +198,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private final AtomicInteger fileIndexGenerator = new AtomicInteger(0); public final SecondaryIndexManager indexManager; - public final ViewManager.ForStore viewManager; + public final TableViews viewManager; /* These are locally held copies to be changed from the config during runtime */ private volatile DefaultValue<Integer> minCompactionThreshold; @@ -373,7 +373,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean maxCompactionThreshold = new DefaultValue<>(metadata.params.compaction.maxCompactionThreshold()); crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance); indexManager = new SecondaryIndexManager(this); - viewManager = keyspace.viewManager.forTable(metadata.cfId); + viewManager = keyspace.viewManager.forTable(metadata); metric = new TableMetrics(this); fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 5783b41..273946e 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -484,7 +484,7 @@ public class Keyspace try { Tracing.trace("Creating materialized view mutations from base table replica"); - viewManager.pushViewReplicaUpdates(upd, !isClReplay, baseComplete); + viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, !isClReplay, baseComplete); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 14923b9..d5f2dc4 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -229,6 +229,39 @@ public class SinglePartitionReadCommand extends ReadCommand return create(metadata, nowInSec, metadata.decorateKey(key), slices); } + /** + * Creates a new single partition name command for the provided rows. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param names the clustering for the rows to query. + * + * @return a newly created read command that queries the {@code names} in {@code key}. The returned query will + * query every columns (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names) + { + ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false); + return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + } + + /** + * Creates a new single partition name command for the provided row. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param name the clustering for the row to query. + * + * @return a newly created read command that queries {@code name} in {@code key}. The returned query will + * query every columns (without limit or row filtering). + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Clustering name) + { + return create(metadata, nowInSec, key, FBUtilities.singleton(name, metadata.comparator)); + } + public SinglePartitionReadCommand copy() { return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java index 8fa9337..bb354a1 100644 --- a/src/java/org/apache/cassandra/db/Slices.java +++ b/src/java/org/apache/cassandra/db/Slices.java @@ -210,6 +210,13 @@ public abstract class Slices implements Iterable<Slice> return this; } + public Builder addAll(Slices slices) + { + for (Slice slice : slices) + add(slice); + return this; + } + public int size() { return slices.size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 8060f23..11cfb87 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -122,6 +122,30 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec); /** + * Whether the provided row in the provided partition satisfies this filter. + * + * @param metadata the table metadata. + * @param partitionKey the partition key for partition to test. + * @param row the row to test. + * @param nowInSec the current time in seconds (to know what is live and what isn't). + * @return {@code true} if {@code row} in partition {@code partitionKey} satisfies this row filter. + */ + public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row, int nowInSec) + { + // We purge all tombstones as the expressions isSatisfiedBy methods expects it + Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + if (purged == null) + return expressions.isEmpty(); + + for (Expression e : expressions) + { + if (!e.isSatisfiedBy(metadata, partitionKey, purged)) + return false; + } + return true; + } + + /** * Returns true if all of the expressions within this filter that apply to the partition key are satisfied by * the given key, false otherwise. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java index bfa6690..1f966db 100644 --- a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java +++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java @@ -65,6 +65,7 @@ public class SingletonUnfilteredPartitionIterator implements UnfilteredPartition public void close() { - iter.close(); + if (!returned) + iter.close(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index 00fc286..7e93c2e 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -74,6 +74,11 @@ public abstract class AbstractCell extends Cell column().validateCellPath(path()); } + public long maxTimestamp() + { + return timestamp(); + } + @Override public boolean equals(Object other) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index e8667e0..ea1d9e0 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -322,6 +322,18 @@ public class BTreeRow extends AbstractRow return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp)); } + public Row withRowDeletion(DeletionTime newDeletion) + { + // Note that: + // - it is a contract with the caller that the new deletion shouldn't shadow anything in + // the row, and so in particular it can't shadow the row deletion. So if there is a + // already a row deletion we have nothing to do. + // - we set the minLocalDeletionTime to MIN_VALUE because we know the deletion is live + return newDeletion.isLive() || !deletion.isLive() + ? this + : new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.regular(newDeletion), btree, Integer.MIN_VALUE); + } + public Row purge(DeletionPurger purger, int nowInSec) { if (!hasDeletion(nowInSec)) @@ -566,6 +578,17 @@ public class BTreeRow extends AbstractRow } List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub); + if (deletion != DeletionTime.LIVE) + { + // Make sure we don't include any shadowed cells + List<Object> filtered = new ArrayList<>(buildFrom.size()); + for (Object c : buildFrom) + { + if (((Cell)c).timestamp() >= deletion.markedForDeleteAt()) + filtered.add(c); + } + buildFrom = filtered; + } Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp()); return new ComplexColumnData(column, btree, deletion); } @@ -621,17 +644,26 @@ public class BTreeRow extends AbstractRow public void addPrimaryKeyLivenessInfo(LivenessInfo info) { - this.primaryKeyLivenessInfo = info; + // The check is only required for unsorted builders, but it's worth the extra safety to have it unconditional + if (!deletion.deletes(info)) + this.primaryKeyLivenessInfo = info; } public void addRowDeletion(Deletion deletion) { this.deletion = deletion; + // The check is only required for unsorted builders, but it's worth the extra safety to have it unconditional + if (deletion.deletes(primaryKeyLivenessInfo)) + this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; } public void addCell(Cell cell) { assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering; + // In practice, only unsorted builder have to deal with shadowed cells, but it doesn't cost us much to deal with it unconditionally in this case + if (deletion.deletes(cell)) + return; + cells.add(cell); hasComplex |= cell.column.isComplex(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java index 8912f59..0a2c528 100644 --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -133,6 +133,11 @@ public class BufferCell extends AbstractCell return path; } + public Cell withUpdatedColumn(ColumnDefinition newColumn) + { + return new BufferCell(newColumn, timestamp, ttl, localDeletionTime, value, path); + } + public Cell withUpdatedValue(ByteBuffer newValue) { return new BufferCell(column, timestamp, ttl, localDeletionTime, newValue, path); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index 73d9e44..b10ce06 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -129,6 +129,8 @@ public abstract class Cell extends ColumnData */ public abstract CellPath path(); + public abstract Cell withUpdatedColumn(ColumnDefinition newColumn); + public abstract Cell withUpdatedValue(ByteBuffer newValue); public abstract Cell copy(AbstractAllocator allocator); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/ColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index 84763e5..933da6a 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -82,4 +82,6 @@ public abstract class ColumnData public abstract ColumnData markCounterLocalToBeCleared(); public abstract ColumnData purge(DeletionPurger purger, int nowInSec); + + public abstract long maxTimestamp(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index fab529b..d67d079 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -184,6 +184,14 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> return transformAndFilter(newDeletion, (cell) -> (Cell) cell.updateAllTimestamp(newTimestamp)); } + public long maxTimestamp() + { + long timestamp = complexDeletion.markedForDeleteAt(); + for (Cell cell : this) + timestamp = Math.max(timestamp, cell.timestamp()); + return timestamp; + } + // This is the partner in crime of ArrayBackedRow.setValue. The exact warning apply. The short // version is: "don't use that method". void setValue(CellPath path, ByteBuffer value) http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 5f79a66..82c07a7 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -215,6 +215,18 @@ public interface Row extends Unfiltered, Collection<ColumnData> */ public Row updateAllTimestamp(long newTimestamp); + /** + * Returns a copy of this row with the new deletion as row deletion if it is more recent + * than the current row deletion. + * <p> + * WARNING: this method <b>does not</b> check that nothing in the row is shadowed by the provided + * deletion and if that is the case, the created row will be <b>invalid</b>. It is thus up to the + * caller to verify that this is not the case and the only reasonable use case of this is probably + * when the row and the deletion comes from the same {@code UnfilteredRowIterator} since that gives + * use this guarantee. + */ + public Row withRowDeletion(DeletionTime deletion); + public int dataSize(); public long unsharedHeapSizeExcludingData(); @@ -227,12 +239,15 @@ public interface Row extends Unfiltered, Collection<ColumnData> * A row deletion mostly consists of the time of said deletion, but there is 2 variants: shadowable * and regular row deletion. * <p> - * A shadowable row deletion only exists if the row timestamp ({@code primaryKeyLivenessInfo().timestamp()}) - * is lower than the deletion timestamp. That is, if a row has a shadowable deletion with timestamp A and an update is made - * to that row with a timestamp B such that B > A, then the shadowable deletion is 'shadowed' by that update. A concrete - * consequence is that if said update has cells with timestamp lower than A, then those cells are preserved - * (since the deletion is removed), and this contrarily to a normal (regular) deletion where the deletion is preserved - * and such cells are removed. + * A shadowable row deletion only exists if the row has no timestamp. In other words, the deletion is only + * valid as long as no newer insert is done (thus setting a row timestap; note that if the row timestamp set + * is lower than the deletion, it is shadowed (and thus ignored) as usual). + * <p> + * That is, if a row has a shadowable deletion with timestamp A and an update is madeto that row with a + * timestamp B such that B > A (and that update sets the row timestamp), then the shadowable deletion is 'shadowed' + * by that update. A concrete consequence is that if said update has cells with timestamp lower than A, then those + * cells are preserved(since the deletion is removed), and this contrarily to a normal (regular) deletion where the + * deletion is preserved and such cells are removed. * <p> * Currently, the only use of shadowable row deletions is Materialized Views, see CASSANDRA-10261. */ @@ -312,6 +327,11 @@ public interface Row extends Unfiltered, Collection<ColumnData> return time.deletes(info); } + public boolean deletes(Cell cell) + { + return time.deletes(cell); + } + public void digest(MessageDigest digest) { time.digest(digest); @@ -361,6 +381,9 @@ public interface Row extends Unfiltered, Collection<ColumnData> * any column before {@code c} and before any call for any column after {@code c}. * 5) Calls to {@link #addCell} are further done in strictly increasing cell order (the one defined by * {@link Cell#comparator}. That is, for a give column, cells are passed in {@code CellPath} order. + * 6) No shadowed data should be added. Concretely, this means that if a a row deletion is added, it doesn't + * deletes the row timestamp or any cell added later, and similarly no cell added is deleted by the complex + * deletion of the column this is a cell of. * * An unsorted builder will not expect those last rules however: {@link #addCell} and {@link #addComplexDeletion} * can be done in any order. And in particular unsorted builder allows multiple calls for the same column/cell. In http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/RowDiffListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java index f209bfc..ec848a0 100644 --- a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java +++ b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java @@ -71,7 +71,7 @@ public interface RowDiffListener * @param i the input row from which {@code original} is from. * @param clustering the clustering for the row that is merged. * @param merged the cell of the merged row. Will be {@code null} if input {@code i} had a cell but that cell is no present - * in the mergd result (it has been deleted/shadowed). + * in the merged result (it has been deleted/shadowed). * @param original the cell of input {@code i}. May be {@code null} if input {@code i} had cell corresponding to {@code merged}. */ public void onCell(int i, Clustering clustering, Cell merged, Cell original); http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 9416896..ce5fffe 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -92,7 +92,7 @@ public abstract class UnfilteredRowIterators } /** - * Returns an empty atom iterator for a given partition. + * Returns an empty unfiltered iterator for a given partition. */ public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java new file mode 100644 index 0000000..893bdd5 --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.view; + +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.btree.BTreeSet; + + +/** + * Groups all the views for a given table. + */ +public class TableViews extends AbstractCollection<View> +{ + private final CFMetaData baseTableMetadata; + + // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace) + // massively exceeds the number of time it's read (for every mutation on the keyspace), so a copy-on-write list is the best option. + private final List<View> views = new CopyOnWriteArrayList(); + + public TableViews(CFMetaData baseTableMetadata) + { + this.baseTableMetadata = baseTableMetadata; + } + + public int size() + { + return views.size(); + } + + public Iterator<View> iterator() + { + return views.iterator(); + } + + public boolean contains(String viewName) + { + return Iterables.any(views, view -> view.name.equals(viewName)); + } + + public boolean add(View view) + { + // We should have validated that there is no existing view with this name at this point + assert !contains(view.name); + return views.add(view); + } + + public Iterable<ColumnFamilyStore> allViewsCfs() + { + Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName); + return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName)); + } + + public void forceBlockingFlush() + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + viewCfs.forceBlockingFlush(); + } + + public void dumpMemtables() + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + viewCfs.dumpMemtable(); + } + + public void truncateBlocking(long truncatedAt) + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + { + ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt); + SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter); + } + } + + public void removeByName(String viewName) + { + views.removeIf(v -> v.name.equals(viewName)); + } + + /** + * Calculates and pushes updates to the views replicas. The replicas are determined by + * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}. + * + * @param update an update on the base table represented by this object. + * @param writeCommitLog whether we should write the commit log for the view updates. + * @param baseComplete time from epoch in ms that the local base mutation was (or will be) completed + */ + public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete) + { + assert update.metadata().cfId.equals(baseTableMetadata.cfId); + + Collection<View> views = updatedViews(update); + if (views.isEmpty()) + return; + + // Read modified rows + int nowInSec = FBUtilities.nowInSeconds(); + SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec); + if (command == null) + return; + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata()); + long start = System.nanoTime(); + Collection<Mutation> mutations; + try (ReadOrderGroup orderGroup = command.startOrderGroup(); + UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command); + UnfilteredRowIterator updates = update.unfilteredIterator()) + { + mutations = generateViewUpdates(views, updates, existings, nowInSec); + } + Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); + + if (!mutations.isEmpty()) + StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete); + } + + /** + * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the + * mutation to be applied to the provided views. + * + * @param views the views potentially affected by {@code updates}. + * @param updates the base table updates being applied. + * @param existings the existing values for the rows affected by {@code updates}. This is used to decide if a view is + * obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if + * a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed + * but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass + * to {@code updates} is new. + * @param nowInSec the current time in seconds. + * @return the mutations to apply to the {@code views}. This can be empty. + */ + public Collection<Mutation> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec) + { + assert updates.metadata().cfId.equals(baseTableMetadata.cfId); + + List<ViewUpdateGenerator> generators = new ArrayList<>(views.size()); + for (View view : views) + generators.add(new ViewUpdateGenerator(view, updates.partitionKey(), nowInSec)); + + DeletionTracker existingsDeletion = new DeletionTracker(existings.partitionLevelDeletion()); + DeletionTracker updatesDeletion = new DeletionTracker(updates.partitionLevelDeletion()); + + /* + * We iterate through the updates and the existing rows in parallel. This allows us to know the consequence + * on the view of each update. + */ + PeekingIterator<Unfiltered> existingsIter = Iterators.peekingIterator(existings); + PeekingIterator<Unfiltered> updatesIter = Iterators.peekingIterator(updates); + + while (existingsIter.hasNext() && updatesIter.hasNext()) + { + Unfiltered existing = existingsIter.peek(); + Unfiltered update = updatesIter.peek(); + + Row existingRow; + Row updateRow; + int cmp = baseTableMetadata.comparator.compare(update, existing); + if (cmp < 0) + { + // We have an update where there was nothing before + if (update.isRangeTombstoneMarker()) + { + updatesDeletion.update(updatesIter.next()); + continue; + } + + updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion()); + existingRow = emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion()); + } + else if (cmp > 0) + { + // We have something existing but no update (which will happen either because it's a range tombstone marker in + // existing, or because we've fetched the existing row due to some partition/range deletion in the updates) + if (existing.isRangeTombstoneMarker()) + { + existingsDeletion.update(existingsIter.next()); + continue; + } + + existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion()); + updateRow = emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()); + + // The way we build the read command used for existing rows, we should always have updatesDeletion.currentDeletion() + // that is not live, since we wouldn't have read the existing row otherwise. And we could assert that, but if we ever + // change the read method so that it can slightly over-read in some case, that would be an easily avoiding bug lurking, + // so we just handle the case. + if (updateRow == null) + continue; + } + else + { + // We're updating a row that had pre-existing data + if (update.isRangeTombstoneMarker()) + { + assert existing.isRangeTombstoneMarker(); + updatesDeletion.update(updatesIter.next()); + existingsDeletion.update(existingsIter.next()); + continue; + } + + assert !existing.isRangeTombstoneMarker(); + existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion()); + updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion()); + } + + addToViewUpdateGenerators(existingRow, updateRow, generators, nowInSec); + } + + // We only care about more existing rows if the update deletion isn't live, i.e. if we had a partition deletion + if (!updatesDeletion.currentDeletion().isLive()) + { + while (existingsIter.hasNext()) + { + Unfiltered existing = existingsIter.next(); + // If it's a range tombstone, we don't care, we're only looking for existing entry that gets deleted by + // the new partition deletion + if (existing.isRangeTombstoneMarker()) + continue; + + Row existingRow = (Row)existing; + addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec); + } + } + while (updatesIter.hasNext()) + { + Unfiltered update = updatesIter.next(); + // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates + if (update.isRangeTombstoneMarker()) + continue; + + Row updateRow = (Row)update; + addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec); + } + + return buildMutations(baseTableMetadata, generators); + } + + /** + * Return the views that are potentially updated by the provided updates. + * + * @param updates the updates applied to the base table. + * @return the views affected by {@code updates}. + */ + public Collection<View> updatedViews(PartitionUpdate updates) + { + List<View> matchingViews = new ArrayList<>(views.size()); + + for (View view : views) + { + ReadQuery selectQuery = view.getReadQuery(); + if (!selectQuery.selectsKey(updates.partitionKey())) + continue; + + matchingViews.add(view); + } + return matchingViews; + } + + /** + * Returns the command to use to read the existing rows required to generate view updates for the provided base + * base updates. + * + * @param updates the base table updates being applied. + * @param views the views potentially affected by {@code updates}. + * @param nowInSec the current time in seconds. + * @return the command to use to read the base table rows required to generate view updates for {@code updates}. + */ + private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate updates, Collection<View> views, int nowInSec) + { + Slices.Builder sliceBuilder = null; + DeletionInfo deletionInfo = updates.deletionInfo(); + CFMetaData metadata = updates.metadata(); + DecoratedKey key = updates.partitionKey(); + // TODO: This is subtle: we need to gather all the slices that we have to fetch between partition del, range tombstones and rows. + if (!deletionInfo.isLive()) + { + sliceBuilder = new Slices.Builder(metadata.comparator); + // Everything covered by a deletion might invalidate an existing view entry, which means we must read it to know. In practice + // though, the views involved might filter some base table clustering columns, in which case we can restrict what we read + // using those restrictions. + // If there is a partition deletion, then we can simply take each slices from each view select filter. They may overlap but + // the Slices.Builder handles that for us. Note that in many case this will just involve reading everything (as soon as any + // view involved has no clustering restrictions for instance). + // For range tombstone, we should theoretically take the difference between the range tombstoned and the slices selected + // by every views, but as we don't an easy way to compute that right now, we keep it simple and just use the tombstoned + // range. + // TODO: we should improve that latter part. + if (!deletionInfo.getPartitionDeletion().isLive()) + { + for (View view : views) + sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices()); + } + else + { + assert deletionInfo.hasRanges(); + Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false); + while (iter.hasNext()) + sliceBuilder.add(iter.next().deletedSlice()); + } + } + + // We need to read every row that is updated, unless we can prove that it has no impact on any view entries. + + // If we had some slices from the deletions above, we'll continue using that. Otherwise, it's more efficient to build + // a names query. + BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? BTreeSet.builder(metadata.comparator) : null; + for (Row row : updates) + { + // Don't read the existing state if we can prove the update won't affect any views + if (!affectsAnyViews(key, row, views)) + continue; + + if (namesBuilder == null) + sliceBuilder.add(Slice.make(row.clustering())); + else + namesBuilder.add(row.clustering()); + } + + NavigableSet<Clustering> names = namesBuilder == null ? null : namesBuilder.build(); + // If we have a slice builder, it means we had some deletions and we have to read. But if we had + // only row updates, it's possible none of them affected the views, in which case we have nothing + // to do. + if (names != null && names.isEmpty()) + return null; + + ClusteringIndexFilter clusteringFilter = names == null + ? new ClusteringIndexSliceFilter(sliceBuilder.build(), false) + : new ClusteringIndexNamesFilter(names, false); + // If we have more than one view, we should merge the queried columns by each views but to keep it simple we just + // include everything. We could change that in the future. + ColumnFilter queriedColumns = views.size() == 1 + ? Iterables.getOnlyElement(views).getSelectStatement().queriedColumns() + : ColumnFilter.all(metadata); + // Note that the views could have restrictions on regular columns, but even if that's the case we shouldn't apply those + // when we read, because even if an existing row doesn't match the view filter, the update can change that in which + // case we'll need to know the existing content. There is also no easy way to merge those RowFilter when we have multiple views. + // TODO: we could still make sense to special case for when there is a single view and a small number of updates (and + // no deletions). Indeed, in that case we could check whether any of the update modify any of the restricted regular + // column, and if that's not the case we could use view filter. We keep it simple for now though. + RowFilter rowFilter = RowFilter.NONE; + return SinglePartitionReadCommand.create(metadata, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter); + } + + private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, Collection<View> views) + { + for (View view : views) + { + if (view.mayBeAffectedBy(partitionKey, update)) + return true; + } + return false; + } + + /** + * Given an existing base row and the update that we're going to apply to this row, generate the modifications + * to apply to MVs using the provided {@code ViewUpdateGenerator}s. + * + * @param existingBaseRow the base table row as it is before an update. + * @param updateBaseRow the newly updates made to {@code existingBaseRow}. + * @param generators the view update generators to add the new changes to. + * @param nowInSec the current time in seconds. Used to decide if data is live or not. + */ + private static void addToViewUpdateGenerators(Row existingBaseRow, Row updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec) + { + // Having existing empty is useful, it just means we'll insert a brand new entry for updateBaseRow, + // but if we have no update at all, we shouldn't get there. + assert !updateBaseRow.isEmpty(); + + // We allow existingBaseRow to be null, which we treat the same as being empty as an small optimization + // to avoid allocating empty row objects when we know there was nothing existing. + Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : Rows.merge(existingBaseRow, updateBaseRow, nowInSec); + for (ViewUpdateGenerator generator : generators) + generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow); + } + + private static Row emptyRow(Clustering clustering, DeletionTime deletion) + { + // Returning null for an empty row is slightly ugly, but the case where there is no pre-existing row is fairly common + // (especially when building the view), so we want to avoid a dummy allocation of an empty row every time. + // And MultiViewUpdateBuilder knows how to deal with that. + return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(deletion)); + } + + /** + * Extracts (and potentially groups) the mutations generated by the provided view update generator. + * Returns the mutation that needs to be done to the views given the base table updates + * passed to {@link #addBaseTableUpdate}. + * + * @param baseTableMetadata the metadata for the base table being updated. + * @param generators the generators from which to extract the view mutations from. + * @return the mutations created by all the generators in {@code generators}. + */ + private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, List<ViewUpdateGenerator> generators) + { + // One view is probably common enough and we can optimize a bit easily + if (generators.size() == 1) + { + Collection<PartitionUpdate> updates = generators.get(0).generateViewUpdates(); + List<Mutation> mutations = new ArrayList<>(updates.size()); + for (PartitionUpdate update : updates) + mutations.add(new Mutation(update)); + return mutations; + } + + Map<DecoratedKey, Mutation> mutations = new HashMap<>(); + for (ViewUpdateGenerator generator : generators) + { + for (PartitionUpdate update : generator.generateViewUpdates()) + { + DecoratedKey key = update.partitionKey(); + Mutation mutation = mutations.get(key); + if (mutation == null) + { + mutation = new Mutation(baseTableMetadata.ksName, key); + mutations.put(key, mutation); + } + mutation.add(update); + } + } + return mutations.values(); + } + + /** + * A simple helper that tracks for a given {@code UnfilteredRowIterator} what is the current deletion at any time of the + * iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise. + */ + private static class DeletionTracker + { + private final DeletionTime partitionDeletion; + private DeletionTime deletion; + + public DeletionTracker(DeletionTime partitionDeletion) + { + this.partitionDeletion = partitionDeletion; + } + + public void update(Unfiltered marker) + { + assert marker instanceof RangeTombstoneMarker; + RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker; + this.deletion = rtm.isOpen(false) + ? rtm.openDeletionTime(false) + : null; + } + + public DeletionTime currentDeletion() + { + return deletion == null ? partitionDeletion : deletion; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java deleted file mode 100644 index 23705b9..0000000 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ /dev/null @@ -1,610 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.view; - -import static java.util.Comparator.naturalOrder; -import static java.util.Comparator.reverseOrder; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.Iterables; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.CBuilder; -import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Conflicts; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.LivenessInfo; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.Slice; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.partitions.AbstractBTreePartition; -import org.apache.cassandra.db.rows.BufferCell; -import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.db.rows.CellPath; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; - -/** - * Represents a single CQL Row in a base table, with both the currently persisted value and the update's value. The - * values are stored in timestamp order, but also indicate whether they are from the currently persisted, allowing a - * {@link TemporalRow.Resolver} to resolve if the value is an old value that has been updated; if it sorts after the - * update's value, then it does not qualify. - */ -public class TemporalRow -{ - private static final int NO_TTL = LivenessInfo.NO_TTL; - private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP; - private static final int NO_DELETION_TIME = DeletionTime.LIVE.localDeletionTime(); - - public interface Resolver - { - /** - * @param cellVersions all cells for a certain TemporalRow's Cell - * @return A single TemporalCell from the iterable which satisfies the resolution criteria, or null if - * there is no cell which qualifies - */ - TemporalCell resolve(TemporalCell.Versions cellVersions); - } - - public static final Resolver oldValueIfUpdated = TemporalCell.Versions::getOldCellIfUpdated; - public static final Resolver earliest = TemporalCell.Versions::getEarliestCell; - public static final Resolver latest = TemporalCell.Versions::getLatestCell; - - private static class TemporalCell - { - public final ByteBuffer value; - public final long timestamp; - public final int ttl; - public final int localDeletionTime; - public final boolean isNew; - - private TemporalCell(ByteBuffer value, long timestamp, int ttl, int localDeletionTime, boolean isNew) - { - this.value = value; - this.timestamp = timestamp; - this.ttl = ttl; - this.localDeletionTime = localDeletionTime; - this.isNew = isNew; - } - - @Override - public String toString() - { - return MoreObjects.toStringHelper(this) - .add("value", value == null ? "null" : ByteBufferUtil.bytesToHex(value)) - .add("timestamp", timestamp) - .add("ttl", ttl) - .add("localDeletionTime", localDeletionTime) - .add("isNew", isNew) - .toString(); - } - - public TemporalCell reconcile(TemporalCell that) - { - int now = FBUtilities.nowInSeconds(); - Conflicts.Resolution resolution = Conflicts.resolveRegular(that.timestamp, - that.isLive(now), - that.localDeletionTime, - that.value, - this.timestamp, - this.isLive(now), - this.localDeletionTime, - this.value); - assert resolution != Conflicts.Resolution.MERGE; - if (resolution == Conflicts.Resolution.LEFT_WINS) - return that; - return this; - } - - private boolean isLive(int now) - { - return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && now < localDeletionTime); - } - - public Cell cell(ColumnDefinition definition, CellPath cellPath) - { - return new BufferCell(definition, timestamp, ttl, localDeletionTime, value, cellPath); - } - - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TemporalCell that = (TemporalCell) o; - - if (timestamp != that.timestamp) return false; - if (ttl != that.ttl) return false; - if (localDeletionTime != that.localDeletionTime) return false; - if (isNew != that.isNew) return false; - return !(value != null ? !value.equals(that.value) : that.value != null); - } - - public int hashCode() - { - int result = value != null ? value.hashCode() : 0; - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + ttl; - result = 31 * result + localDeletionTime; - result = 31 * result + (isNew ? 1 : 0); - return result; - } - - /** - * Tracks the versions of a cell for a given TemporalRow. - * There are only two possible versions, existing and new. - * - */ - static class Versions - { - private TemporalCell existingCell = null; - private TemporalCell newCell = null; - private int numSet = 0; - - - /** - * @return the cell that is earliest - * Or would be overwritten in the case of a timestamp conflict - */ - public TemporalCell getEarliestCell() - { - assert numSet > 0; - - if (numSet == 1) - return existingCell == null ? newCell : existingCell; - - TemporalCell latest = existingCell.reconcile(newCell); - - return latest == newCell ? existingCell : newCell; - } - - /** - * @return the cell that is latest - * Or would be the winner in the case of a timestamp conflict - */ - public TemporalCell getLatestCell() - { - assert numSet > 0; - - if (numSet == 1) - return existingCell == null ? newCell : existingCell; - - return existingCell.reconcile(newCell); - } - - /** - * @return the new cell if it updates the existing cell - */ - public TemporalCell getOldCellIfUpdated() - { - assert numSet > 0; - - if (numSet == 1) - return null; - - TemporalCell value = existingCell.reconcile(newCell); - - return ByteBufferUtil.compareUnsigned(existingCell.value, value.value) != 0 ? existingCell : null; - } - - void setVersion(TemporalCell cell) - { - assert cell != null; - - if (cell.isNew) - { - assert newCell == null || newCell.equals(cell) : "Only one cell version can be marked New; newCell: " + newCell + ", cell: " + cell; - newCell = cell; - numSet = existingCell == null ? 1 : 2; - } - else - { - assert existingCell == null || existingCell.equals(cell) : "Only one cell version can be marked Existing; existingCell: " + existingCell + ", cell: " + cell; - existingCell = cell; - numSet = newCell == null ? 1 : 2; - } - } - - public void addToRow(TemporalRow row, ColumnIdentifier column, CellPath path) - { - if (existingCell != null) - row.addColumnValue(column, path, existingCell.timestamp, existingCell.ttl, - existingCell.localDeletionTime, existingCell.value, existingCell.isNew); - - if (newCell != null) - row.addColumnValue(column, path, newCell.timestamp, newCell.ttl, - newCell.localDeletionTime, newCell.value, newCell.isNew); - } - - @Override - public String toString() - { - return MoreObjects.toStringHelper(this) - .add("numSet", numSet) - .add("existingCell", existingCell) - .add("newCell", newCell) - .toString(); - } - } - } - - private final ColumnFamilyStore baseCfs; - private final java.util.Set<ColumnIdentifier> viewPrimaryKey; - private final ByteBuffer basePartitionKey; - private final Map<ColumnIdentifier, ByteBuffer> clusteringColumns; - private final Row startRow; - private final boolean startIsNew; - - public final int nowInSec; - private final Map<ColumnIdentifier, Map<CellPath, TemporalCell.Versions>> columnValues = new HashMap<>(); - private int viewClusteringTtl = NO_TTL; - private long viewClusteringTimestamp = NO_TIMESTAMP; - private int viewClusteringLocalDeletionTime = NO_DELETION_TIME; - - TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew) - { - this.baseCfs = baseCfs; - this.viewPrimaryKey = viewPrimaryKey; - this.basePartitionKey = key; - this.startRow = row; - this.startIsNew = isNew; - this.nowInSec = nowInSec; - - LivenessInfo liveness = row.primaryKeyLivenessInfo(); - updateLiveness(liveness.ttl(), liveness.timestamp(), row.deletion().time().localDeletionTime()); - - List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns(); - clusteringColumns = new HashMap<>(); - - for (int i = 0; i < clusteringDefs.size(); i++) - { - ColumnDefinition cdef = clusteringDefs.get(i); - clusteringColumns.put(cdef.name, row.clustering().get(i)); - - addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME, row.clustering().get(i), isNew); - } - } - - /* - * PK ts:5, ttl:1, deletion: 2 - * Col ts:4, ttl:2, deletion: 3 - * - * TTL use min, since it expires at the lowest time which we are expiring. If we have the above values, we - * would want to return 1, since the base row expires in 1 second. - * - * Timestamp uses max, as this is the time that the row has been written to the view. See CASSANDRA-10910. - * - * Local Deletion Time should use max, as this deletion will cover all previous values written. - */ - private void updateLiveness(int ttl, long timestamp, int localDeletionTime) - { - // We are returning whichever is higher from valueIfSet - // Natural order will return the max: 1.compareTo(2) < 0, so 2 is returned - // Reverse order will return the min: 1.compareTo(2) > 0, so 1 is returned - this.viewClusteringTtl = valueIfSet(viewClusteringTtl, ttl, NO_TTL, reverseOrder()); - this.viewClusteringTimestamp = valueIfSet(viewClusteringTimestamp, timestamp, NO_TIMESTAMP, naturalOrder()); - this.viewClusteringLocalDeletionTime = valueIfSet(viewClusteringLocalDeletionTime, localDeletionTime, NO_DELETION_TIME, naturalOrder()); - } - - @Override - public String toString() - { - return MoreObjects.toStringHelper(this) - .add("table", baseCfs.keyspace.getName() + "." + baseCfs.getTableName()) - .add("basePartitionKey", ByteBufferUtil.bytesToHex(basePartitionKey)) - .add("startRow", startRow.toString(baseCfs.metadata)) - .add("startIsNew", startIsNew) - .add("nowInSec", nowInSec) - .add("viewClusteringTtl", viewClusteringTtl) - .add("viewClusteringTimestamp", viewClusteringTimestamp) - .add("viewClusteringLocalDeletionTime", viewClusteringLocalDeletionTime) - .add("columnValues", columnValues) - .toString(); - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TemporalRow that = (TemporalRow) o; - - if (!clusteringColumns.equals(that.clusteringColumns)) return false; - if (!basePartitionKey.equals(that.basePartitionKey)) return false; - - return true; - } - - @Override - public int hashCode() - { - int result = basePartitionKey.hashCode(); - result = 31 * result + clusteringColumns.hashCode(); - return result; - } - - public void addColumnValue(ColumnIdentifier identifier, - CellPath cellPath, - long timestamp, - int ttl, - int localDeletionTime, - ByteBuffer value, boolean isNew) - { - if (!columnValues.containsKey(identifier)) - columnValues.put(identifier, new HashMap<>()); - - Map<CellPath, TemporalCell.Versions> innerMap = columnValues.get(identifier); - - if (!innerMap.containsKey(cellPath)) - innerMap.put(cellPath, new TemporalCell.Versions()); - - // If this column is part of the view's primary keys - if (viewPrimaryKey.contains(identifier)) - { - updateLiveness(ttl, timestamp, localDeletionTime); - } - - innerMap.get(cellPath).setVersion(new TemporalCell(value, timestamp, ttl, localDeletionTime, isNew)); - } - - /** - * @return - * <ul> - * <li> - * If both existing and update are defaultValue, return defaultValue - * </li> - * <li> - * If only one of existing or existing are defaultValue, return the one which is not - * </li> - * <li> - * If both existing and update are not defaultValue, compare using comparator and return the higher one. - * </li> - * </ul> - */ - private static <T> T valueIfSet(T existing, T update, T defaultValue, Comparator<T> comparator) - { - if (existing.equals(defaultValue)) - return update; - if (update.equals(defaultValue)) - return existing; - return comparator.compare(existing, update) > 0 ? existing : update; - } - - public int viewClusteringTtl() - { - return viewClusteringTtl; - } - - public long viewClusteringTimestamp() - { - return viewClusteringTimestamp; - } - - public int viewClusteringLocalDeletionTime() - { - return viewClusteringLocalDeletionTime; - } - - public void addCell(Cell cell, boolean isNew) - { - addColumnValue(cell.column().name, cell.path(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), isNew); - } - - // The Definition here is actually the *base table* definition - public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver resolver) - { - ColumnDefinition baseDefinition = definition.cfName.equals(baseCfs.name) - ? definition - : baseCfs.metadata.getColumnDefinition(definition.name); - - if (baseDefinition.isPartitionKey()) - { - if (baseCfs.metadata.getKeyValidator() instanceof CompositeType) - { - CompositeType keyComparator = (CompositeType) baseCfs.metadata.getKeyValidator(); - ByteBuffer[] components = keyComparator.split(basePartitionKey); - return components[baseDefinition.position()]; - } - else - { - return basePartitionKey; - } - } - else - { - ColumnIdentifier columnIdentifier = baseDefinition.name; - - if (clusteringColumns.containsKey(columnIdentifier)) - return clusteringColumns.get(columnIdentifier); - - Collection<org.apache.cassandra.db.rows.Cell> val = values(definition, resolver); - if (val != null && val.size() == 1) - { - org.apache.cassandra.db.rows.Cell cell = Iterables.getOnlyElement(val); - // handle single-column deletions correctly - return cell.isTombstone() ? null : cell.value(); - } - } - return null; - } - - public DeletionTime deletionTime(AbstractBTreePartition partition) - { - DeletionInfo deletionInfo = partition.deletionInfo(); - if (!deletionInfo.getPartitionDeletion().isLive()) - return deletionInfo.getPartitionDeletion(); - - Clustering baseClustering = baseClusteringBuilder().build(); - RangeTombstone clusterTombstone = deletionInfo.rangeCovering(baseClustering); - if (clusterTombstone != null) - return clusterTombstone.deletionTime(); - - Row row = partition.getRow(baseClustering); - return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion().time(); - } - - public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition, Resolver resolver) - { - Map<CellPath, TemporalCell.Versions> innerMap = columnValues.get(definition.name); - if (innerMap == null) - { - return Collections.emptyList(); - } - - Collection<org.apache.cassandra.db.rows.Cell> value = new ArrayList<>(); - for (Map.Entry<CellPath, TemporalCell.Versions> pathAndCells : innerMap.entrySet()) - { - TemporalCell cell = resolver.resolve(pathAndCells.getValue()); - - if (cell != null) - value.add(cell.cell(definition, pathAndCells.getKey())); - } - return value; - } - - public Slice baseSlice() - { - return baseClusteringBuilder().buildSlice(); - } - - private CBuilder baseClusteringBuilder() - { - CFMetaData metadata = baseCfs.metadata; - CBuilder builder = CBuilder.create(metadata.comparator); - - ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()]; - for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : clusteringColumns.entrySet()) - buffers[metadata.getColumnDefinition(buffer.getKey()).position()] = buffer.getValue(); - - for (ByteBuffer byteBuffer : buffers) - builder = builder.add(byteBuffer); - - return builder; - } - - public Clustering baseClustering() - { - return startRow.clustering(); - } - - static class Set implements Iterable<TemporalRow> - { - private final ColumnFamilyStore baseCfs; - private final java.util.Set<ColumnIdentifier> viewPrimaryKey; - private final ByteBuffer key; - public final DecoratedKey dk; - private final Map<Clustering, TemporalRow> clusteringToRow; - final int nowInSec = FBUtilities.nowInSeconds(); - private boolean hasTombstonedExisting = false; - - Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key) - { - this.baseCfs = baseCfs; - this.viewPrimaryKey = viewPrimaryKey; - this.key = key; - this.dk = baseCfs.decorateKey(key); - this.clusteringToRow = new HashMap<>(); - } - - public Iterator<TemporalRow> iterator() - { - return clusteringToRow.values().iterator(); - } - - public TemporalRow getClustering(Clustering clustering) - { - return clusteringToRow.get(clustering); - } - - public void addRow(Row row, boolean isNew) - { - TemporalRow temporalRow = clusteringToRow.get(row.clustering()); - if (temporalRow == null) - { - temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row, nowInSec, isNew); - clusteringToRow.put(row.clustering(), temporalRow); - } - - for (Cell cell : row.cells()) - { - temporalRow.addCell(cell, isNew); - } - } - - private void addRow(TemporalRow row) - { - TemporalRow newRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row.startRow, nowInSec, row.startIsNew); - - TemporalRow existing = clusteringToRow.put(row.startRow.clustering(), newRow); - assert existing == null; - - for (Map.Entry<ColumnIdentifier, Map<CellPath, TemporalCell.Versions>> entry : row.columnValues.entrySet()) - { - for (Map.Entry<CellPath, TemporalCell.Versions> cellPathEntry : entry.getValue().entrySet()) - { - TemporalCell.Versions cellVersions = cellPathEntry.getValue(); - - cellVersions.addToRow(newRow, entry.getKey(), cellPathEntry.getKey()); - } - } - } - - public TemporalRow.Set withNewViewPrimaryKey(java.util.Set<ColumnIdentifier> viewPrimaryKey) - { - TemporalRow.Set newSet = new Set(baseCfs, viewPrimaryKey, key); - - for (TemporalRow row : this) - newSet.addRow(row); - - return newSet; - } - - public boolean hasTombstonedExisting() - { - return hasTombstonedExisting; - } - - public void setTombstonedExisting() - { - hasTombstonedExisting = true; - } - - public int size() - { - return clusteringToRow.size(); - } - } -}