Merge commit '86ba227477b9f8595eb610ecaf950cfbc29dd36b' into cassandra-3.7
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a87fd715 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a87fd715 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a87fd715 Branch: refs/heads/trunk Commit: a87fd715d6b26128603a404074ec3df42a595b2e Parents: 4e364d7 86ba227 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri May 6 13:43:44 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri May 6 13:44:12 2016 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 6 + .../apache/cassandra/config/ViewDefinition.java | 1 - .../cql3/statements/CreateViewStatement.java | 4 +- .../cql3/statements/SelectStatement.java | 41 +- .../apache/cassandra/db/ColumnFamilyStore.java | 6 +- src/java/org/apache/cassandra/db/Keyspace.java | 2 +- .../db/SinglePartitionReadCommand.java | 33 + src/java/org/apache/cassandra/db/Slices.java | 7 + .../apache/cassandra/db/filter/RowFilter.java | 24 + .../SingletonUnfilteredPartitionIterator.java | 3 +- .../apache/cassandra/db/rows/AbstractCell.java | 5 + .../org/apache/cassandra/db/rows/BTreeRow.java | 34 +- .../apache/cassandra/db/rows/BufferCell.java | 5 + src/java/org/apache/cassandra/db/rows/Cell.java | 2 + .../apache/cassandra/db/rows/ColumnData.java | 2 + .../cassandra/db/rows/ComplexColumnData.java | 8 + .../apache/cassandra/db/rows/NativeCell.java | 5 + src/java/org/apache/cassandra/db/rows/Row.java | 35 +- .../cassandra/db/rows/RowDiffListener.java | 2 +- .../db/rows/UnfilteredRowIterators.java | 2 +- .../apache/cassandra/db/view/TableViews.java | 481 ++++++++++++++ .../apache/cassandra/db/view/TemporalRow.java | 601 ------------------ src/java/org/apache/cassandra/db/view/View.java | 629 ++----------------- .../apache/cassandra/db/view/ViewBuilder.java | 38 +- .../apache/cassandra/db/view/ViewManager.java | 146 +---- .../cassandra/db/view/ViewUpdateGenerator.java | 549 ++++++++++++++++ .../org/apache/cassandra/cql3/ViewTest.java | 52 +- .../org/apache/cassandra/db/rows/RowsTest.java | 6 +- 28 files changed, 1400 insertions(+), 1329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/config/ViewDefinition.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/Cell.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/NativeCell.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/NativeCell.java index 9d816f3,0000000..5930332 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/rows/NativeCell.java +++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java @@@ -1,151 -1,0 +1,156 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.MemoryUtil; +import org.apache.cassandra.utils.memory.NativeAllocator; + +public class NativeCell extends AbstractCell +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeCell()); + + private static final long HAS_CELLPATH = 0; + private static final long TIMESTAMP = 1; + private static final long TTL = 9; + private static final long DELETION = 13; + private static final long LENGTH = 17; + private static final long VALUE = 21; + + private final long peer; + + private NativeCell() + { + super(null); + this.peer = 0; + } + + public NativeCell(NativeAllocator allocator, + OpOrder.Group writeOp, + Cell cell) + { + this(allocator, + writeOp, + cell.column(), + cell.timestamp(), + cell.ttl(), + cell.localDeletionTime(), + cell.value(), + cell.path()); + } + + public NativeCell(NativeAllocator allocator, + OpOrder.Group writeOp, + ColumnDefinition column, + long timestamp, + int ttl, + int localDeletionTime, + ByteBuffer value, + CellPath path) + { + super(column); + long size = simpleSize(value.remaining()); + + assert value.order() == ByteOrder.BIG_ENDIAN; + assert column.isComplex() == (path != null); + if (path != null) + { + assert path.size() == 1; + size += 4 + path.get(0).remaining(); + } + + if (size > Integer.MAX_VALUE) + throw new IllegalStateException(); + + // cellpath? : timestamp : ttl : localDeletionTime : length : <data> : [cell path length] : [<cell path data>] + peer = allocator.allocate((int) size, writeOp); + MemoryUtil.setByte(peer + HAS_CELLPATH, (byte)(path == null ? 0 : 1)); + MemoryUtil.setLong(peer + TIMESTAMP, timestamp); + MemoryUtil.setInt(peer + TTL, ttl); + MemoryUtil.setInt(peer + DELETION, localDeletionTime); + MemoryUtil.setInt(peer + LENGTH, value.remaining()); + MemoryUtil.setBytes(peer + VALUE, value); + + if (path != null) + { + ByteBuffer pathbuffer = path.get(0); + assert pathbuffer.order() == ByteOrder.BIG_ENDIAN; + + long offset = peer + VALUE + value.remaining(); + MemoryUtil.setInt(offset, pathbuffer.remaining()); + MemoryUtil.setBytes(offset + 4, pathbuffer); + } + } + + private static long simpleSize(int length) + { + return VALUE + length; + } + + public long timestamp() + { + return MemoryUtil.getLong(peer + TIMESTAMP); + } + + public int ttl() + { + return MemoryUtil.getInt(peer + TTL); + } + + public int localDeletionTime() + { + return MemoryUtil.getInt(peer + DELETION); + } + + public ByteBuffer value() + { + int length = MemoryUtil.getInt(peer + LENGTH); + return MemoryUtil.getByteBuffer(peer + VALUE, length, ByteOrder.BIG_ENDIAN); + } + + public CellPath path() + { + if (MemoryUtil.getByte(peer+ HAS_CELLPATH) == 0) + return null; + + long offset = peer + VALUE + MemoryUtil.getInt(peer + LENGTH); + int size = MemoryUtil.getInt(offset); + return CellPath.create(MemoryUtil.getByteBuffer(offset + 4, size, ByteOrder.BIG_ENDIAN)); + } + + public Cell withUpdatedValue(ByteBuffer newValue) + { + throw new UnsupportedOperationException(); + } + ++ public Cell withUpdatedColumn(ColumnDefinition column) ++ { ++ return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), value(), path()); ++ } ++ + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/RowDiffListener.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/view/TableViews.java index 0000000,893bdd5..e97e01c mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/view/TableViews.java +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@@ -1,0 -1,481 +1,481 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.db.view; + + import java.util.*; + import java.util.concurrent.CopyOnWriteArrayList; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + + import com.google.common.collect.Iterables; + import com.google.common.collect.Iterators; + import com.google.common.collect.PeekingIterator; + + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.db.*; + import org.apache.cassandra.db.commitlog.ReplayPosition; + import org.apache.cassandra.db.filter.*; + import org.apache.cassandra.db.rows.*; + import org.apache.cassandra.db.partitions.*; + import org.apache.cassandra.dht.Token; + import org.apache.cassandra.service.StorageProxy; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.btree.BTreeSet; + + + /** + * Groups all the views for a given table. + */ + public class TableViews extends AbstractCollection<View> + { + private final CFMetaData baseTableMetadata; + + // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace) + // massively exceeds the number of time it's read (for every mutation on the keyspace), so a copy-on-write list is the best option. + private final List<View> views = new CopyOnWriteArrayList(); + + public TableViews(CFMetaData baseTableMetadata) + { + this.baseTableMetadata = baseTableMetadata; + } + + public int size() + { + return views.size(); + } + + public Iterator<View> iterator() + { + return views.iterator(); + } + + public boolean contains(String viewName) + { + return Iterables.any(views, view -> view.name.equals(viewName)); + } + + public boolean add(View view) + { + // We should have validated that there is no existing view with this name at this point + assert !contains(view.name); + return views.add(view); + } + + public Iterable<ColumnFamilyStore> allViewsCfs() + { + Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName); + return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName)); + } + + public void forceBlockingFlush() + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + viewCfs.forceBlockingFlush(); + } + + public void dumpMemtables() + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + viewCfs.dumpMemtable(); + } + + public void truncateBlocking(long truncatedAt) + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + { + ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt); + SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter); + } + } + + public void removeByName(String viewName) + { + views.removeIf(v -> v.name.equals(viewName)); + } + + /** + * Calculates and pushes updates to the views replicas. The replicas are determined by + * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}. + * + * @param update an update on the base table represented by this object. + * @param writeCommitLog whether we should write the commit log for the view updates. + * @param baseComplete time from epoch in ms that the local base mutation was (or will be) completed + */ + public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete) + { + assert update.metadata().cfId.equals(baseTableMetadata.cfId); + + Collection<View> views = updatedViews(update); + if (views.isEmpty()) + return; + + // Read modified rows + int nowInSec = FBUtilities.nowInSeconds(); + SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec); + if (command == null) + return; + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata()); + long start = System.nanoTime(); + Collection<Mutation> mutations; - try (ReadOrderGroup orderGroup = command.startOrderGroup(); ++ try (ReadExecutionController orderGroup = command.executionController(); + UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command); + UnfilteredRowIterator updates = update.unfilteredIterator()) + { + mutations = generateViewUpdates(views, updates, existings, nowInSec); + } + Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); + + if (!mutations.isEmpty()) + StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete); + } + + /** + * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the + * mutation to be applied to the provided views. + * + * @param views the views potentially affected by {@code updates}. + * @param updates the base table updates being applied. + * @param existings the existing values for the rows affected by {@code updates}. This is used to decide if a view is + * obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if + * a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed + * but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass + * to {@code updates} is new. + * @param nowInSec the current time in seconds. + * @return the mutations to apply to the {@code views}. This can be empty. + */ + public Collection<Mutation> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec) + { + assert updates.metadata().cfId.equals(baseTableMetadata.cfId); + + List<ViewUpdateGenerator> generators = new ArrayList<>(views.size()); + for (View view : views) + generators.add(new ViewUpdateGenerator(view, updates.partitionKey(), nowInSec)); + + DeletionTracker existingsDeletion = new DeletionTracker(existings.partitionLevelDeletion()); + DeletionTracker updatesDeletion = new DeletionTracker(updates.partitionLevelDeletion()); + + /* + * We iterate through the updates and the existing rows in parallel. This allows us to know the consequence + * on the view of each update. + */ + PeekingIterator<Unfiltered> existingsIter = Iterators.peekingIterator(existings); + PeekingIterator<Unfiltered> updatesIter = Iterators.peekingIterator(updates); + + while (existingsIter.hasNext() && updatesIter.hasNext()) + { + Unfiltered existing = existingsIter.peek(); + Unfiltered update = updatesIter.peek(); + + Row existingRow; + Row updateRow; + int cmp = baseTableMetadata.comparator.compare(update, existing); + if (cmp < 0) + { + // We have an update where there was nothing before + if (update.isRangeTombstoneMarker()) + { + updatesDeletion.update(updatesIter.next()); + continue; + } + + updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion()); + existingRow = emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion()); + } + else if (cmp > 0) + { + // We have something existing but no update (which will happen either because it's a range tombstone marker in + // existing, or because we've fetched the existing row due to some partition/range deletion in the updates) + if (existing.isRangeTombstoneMarker()) + { + existingsDeletion.update(existingsIter.next()); + continue; + } + + existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion()); + updateRow = emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()); + + // The way we build the read command used for existing rows, we should always have updatesDeletion.currentDeletion() + // that is not live, since we wouldn't have read the existing row otherwise. And we could assert that, but if we ever + // change the read method so that it can slightly over-read in some case, that would be an easily avoiding bug lurking, + // so we just handle the case. + if (updateRow == null) + continue; + } + else + { + // We're updating a row that had pre-existing data + if (update.isRangeTombstoneMarker()) + { + assert existing.isRangeTombstoneMarker(); + updatesDeletion.update(updatesIter.next()); + existingsDeletion.update(existingsIter.next()); + continue; + } + + assert !existing.isRangeTombstoneMarker(); + existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion()); + updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion()); + } + + addToViewUpdateGenerators(existingRow, updateRow, generators, nowInSec); + } + + // We only care about more existing rows if the update deletion isn't live, i.e. if we had a partition deletion + if (!updatesDeletion.currentDeletion().isLive()) + { + while (existingsIter.hasNext()) + { + Unfiltered existing = existingsIter.next(); + // If it's a range tombstone, we don't care, we're only looking for existing entry that gets deleted by + // the new partition deletion + if (existing.isRangeTombstoneMarker()) + continue; + + Row existingRow = (Row)existing; + addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec); + } + } + while (updatesIter.hasNext()) + { + Unfiltered update = updatesIter.next(); + // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates + if (update.isRangeTombstoneMarker()) + continue; + + Row updateRow = (Row)update; + addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec); + } + + return buildMutations(baseTableMetadata, generators); + } + + /** + * Return the views that are potentially updated by the provided updates. + * + * @param updates the updates applied to the base table. + * @return the views affected by {@code updates}. + */ + public Collection<View> updatedViews(PartitionUpdate updates) + { + List<View> matchingViews = new ArrayList<>(views.size()); + + for (View view : views) + { + ReadQuery selectQuery = view.getReadQuery(); + if (!selectQuery.selectsKey(updates.partitionKey())) + continue; + + matchingViews.add(view); + } + return matchingViews; + } + + /** + * Returns the command to use to read the existing rows required to generate view updates for the provided base + * base updates. + * + * @param updates the base table updates being applied. + * @param views the views potentially affected by {@code updates}. + * @param nowInSec the current time in seconds. + * @return the command to use to read the base table rows required to generate view updates for {@code updates}. + */ + private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate updates, Collection<View> views, int nowInSec) + { + Slices.Builder sliceBuilder = null; + DeletionInfo deletionInfo = updates.deletionInfo(); + CFMetaData metadata = updates.metadata(); + DecoratedKey key = updates.partitionKey(); + // TODO: This is subtle: we need to gather all the slices that we have to fetch between partition del, range tombstones and rows. + if (!deletionInfo.isLive()) + { + sliceBuilder = new Slices.Builder(metadata.comparator); + // Everything covered by a deletion might invalidate an existing view entry, which means we must read it to know. In practice + // though, the views involved might filter some base table clustering columns, in which case we can restrict what we read + // using those restrictions. + // If there is a partition deletion, then we can simply take each slices from each view select filter. They may overlap but + // the Slices.Builder handles that for us. Note that in many case this will just involve reading everything (as soon as any + // view involved has no clustering restrictions for instance). + // For range tombstone, we should theoretically take the difference between the range tombstoned and the slices selected + // by every views, but as we don't an easy way to compute that right now, we keep it simple and just use the tombstoned + // range. + // TODO: we should improve that latter part. + if (!deletionInfo.getPartitionDeletion().isLive()) + { + for (View view : views) + sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices()); + } + else + { + assert deletionInfo.hasRanges(); + Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false); + while (iter.hasNext()) + sliceBuilder.add(iter.next().deletedSlice()); + } + } + + // We need to read every row that is updated, unless we can prove that it has no impact on any view entries. + + // If we had some slices from the deletions above, we'll continue using that. Otherwise, it's more efficient to build + // a names query. + BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? BTreeSet.builder(metadata.comparator) : null; + for (Row row : updates) + { + // Don't read the existing state if we can prove the update won't affect any views + if (!affectsAnyViews(key, row, views)) + continue; + + if (namesBuilder == null) + sliceBuilder.add(Slice.make(row.clustering())); + else + namesBuilder.add(row.clustering()); + } + + NavigableSet<Clustering> names = namesBuilder == null ? null : namesBuilder.build(); + // If we have a slice builder, it means we had some deletions and we have to read. But if we had + // only row updates, it's possible none of them affected the views, in which case we have nothing + // to do. + if (names != null && names.isEmpty()) + return null; + + ClusteringIndexFilter clusteringFilter = names == null + ? new ClusteringIndexSliceFilter(sliceBuilder.build(), false) + : new ClusteringIndexNamesFilter(names, false); + // If we have more than one view, we should merge the queried columns by each views but to keep it simple we just + // include everything. We could change that in the future. + ColumnFilter queriedColumns = views.size() == 1 + ? Iterables.getOnlyElement(views).getSelectStatement().queriedColumns() + : ColumnFilter.all(metadata); + // Note that the views could have restrictions on regular columns, but even if that's the case we shouldn't apply those + // when we read, because even if an existing row doesn't match the view filter, the update can change that in which + // case we'll need to know the existing content. There is also no easy way to merge those RowFilter when we have multiple views. + // TODO: we could still make sense to special case for when there is a single view and a small number of updates (and + // no deletions). Indeed, in that case we could check whether any of the update modify any of the restricted regular + // column, and if that's not the case we could use view filter. We keep it simple for now though. + RowFilter rowFilter = RowFilter.NONE; + return SinglePartitionReadCommand.create(metadata, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter); + } + + private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, Collection<View> views) + { + for (View view : views) + { + if (view.mayBeAffectedBy(partitionKey, update)) + return true; + } + return false; + } + + /** + * Given an existing base row and the update that we're going to apply to this row, generate the modifications + * to apply to MVs using the provided {@code ViewUpdateGenerator}s. + * + * @param existingBaseRow the base table row as it is before an update. + * @param updateBaseRow the newly updates made to {@code existingBaseRow}. + * @param generators the view update generators to add the new changes to. + * @param nowInSec the current time in seconds. Used to decide if data is live or not. + */ + private static void addToViewUpdateGenerators(Row existingBaseRow, Row updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec) + { + // Having existing empty is useful, it just means we'll insert a brand new entry for updateBaseRow, + // but if we have no update at all, we shouldn't get there. + assert !updateBaseRow.isEmpty(); + + // We allow existingBaseRow to be null, which we treat the same as being empty as an small optimization + // to avoid allocating empty row objects when we know there was nothing existing. + Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : Rows.merge(existingBaseRow, updateBaseRow, nowInSec); + for (ViewUpdateGenerator generator : generators) + generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow); + } + + private static Row emptyRow(Clustering clustering, DeletionTime deletion) + { + // Returning null for an empty row is slightly ugly, but the case where there is no pre-existing row is fairly common + // (especially when building the view), so we want to avoid a dummy allocation of an empty row every time. + // And MultiViewUpdateBuilder knows how to deal with that. + return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(deletion)); + } + + /** + * Extracts (and potentially groups) the mutations generated by the provided view update generator. + * Returns the mutation that needs to be done to the views given the base table updates + * passed to {@link #addBaseTableUpdate}. + * + * @param baseTableMetadata the metadata for the base table being updated. + * @param generators the generators from which to extract the view mutations from. + * @return the mutations created by all the generators in {@code generators}. + */ + private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, List<ViewUpdateGenerator> generators) + { + // One view is probably common enough and we can optimize a bit easily + if (generators.size() == 1) + { + Collection<PartitionUpdate> updates = generators.get(0).generateViewUpdates(); + List<Mutation> mutations = new ArrayList<>(updates.size()); + for (PartitionUpdate update : updates) + mutations.add(new Mutation(update)); + return mutations; + } + + Map<DecoratedKey, Mutation> mutations = new HashMap<>(); + for (ViewUpdateGenerator generator : generators) + { + for (PartitionUpdate update : generator.generateViewUpdates()) + { + DecoratedKey key = update.partitionKey(); + Mutation mutation = mutations.get(key); + if (mutation == null) + { + mutation = new Mutation(baseTableMetadata.ksName, key); + mutations.put(key, mutation); + } + mutation.add(update); + } + } + return mutations.values(); + } + + /** + * A simple helper that tracks for a given {@code UnfilteredRowIterator} what is the current deletion at any time of the + * iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise. + */ + private static class DeletionTracker + { + private final DeletionTime partitionDeletion; + private DeletionTime deletion; + + public DeletionTracker(DeletionTime partitionDeletion) + { + this.partitionDeletion = partitionDeletion; + } + + public void update(Unfiltered marker) + { + assert marker instanceof RangeTombstoneMarker; + RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker; + this.deletion = rtm.isOpen(false) + ? rtm.openDeletionTime(false) + : null; + } + + public DeletionTime currentDeletion() + { + return deletion == null ? partitionDeletion : deletion; + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/view/ViewBuilder.java index 8944122,b2b409b..65e26e2 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@@ -77,28 -76,22 +77,22 @@@ public class ViewBuilder extends Compac if (!selectQuery.selectsKey(key)) return; - QueryPager pager = view.getSelectStatement().internalReadForView(key, FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION); + int nowInSec = FBUtilities.nowInSeconds(); + SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); - while (!pager.isExhausted()) + // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates + // and pretend that there is nothing pre-existing. + UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false); + + Collection<Mutation> mutations; - try (ReadOrderGroup orderGroup = command.startOrderGroup(); ++ try (ReadExecutionController orderGroup = command.executionController(); + UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command)) { - try (ReadExecutionController executionController = pager.executionController(); - PartitionIterator partitionIterator = pager.fetchPageInternal(128, executionController)) - { - if (!partitionIterator.hasNext()) - return; - - try (RowIterator rowIterator = partitionIterator.next()) - { - FilteredPartition partition = FilteredPartition.create(rowIterator); - TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true); - - Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true); - - if (mutations != null) - StorageProxy.mutateMV(key.getKey(), mutations, true, noBase); - } - } + mutations = baseCfs.keyspace.viewManager.forTable(baseCfs.metadata).generateViewUpdates(Collections.singleton(view), data, empty, nowInSec); } + + if (!mutations.isEmpty()) + StorageProxy.mutateMV(key.getKey(), mutations, true, noBase); } public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/view/ViewManager.java index 6b8fe0d,fd04b97..bd73733 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@@ -30,24 -31,24 +31,25 @@@ import org.apache.cassandra.config.CFMe import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ViewDefinition; import org.apache.cassandra.db.*; - import org.apache.cassandra.db.commitlog.ReplayPosition; - import org.apache.cassandra.db.partitions.PartitionUpdate; - import org.apache.cassandra.dht.Token; + import org.apache.cassandra.db.rows.*; + import org.apache.cassandra.db.partitions.*; - +import org.apache.cassandra.repair.SystemDistributedKeyspace; - import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - /** * Manages {@link View}'s for a single {@link ColumnFamilyStore}. All of the views for that table are created when this * manager is initialized. * * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update - * any views {@link ViewManager#updatesAffectView(Collection, boolean)}, provide locks to prevent multiple - * updates from creating incoherent updates in the view {@link ViewManager#acquireLockFor(ByteBuffer)}, and + * any views {@link #updatesAffectView(Collection, boolean)}, provide locks to prevent multiple + * updates from creating incoherent updates in the view {@link #acquireLockFor(int)}, and * to affect change on the view. + * + * TODO: I think we can get rid of that class. For addition/removal of view by names, we could move it Keyspace. And we + * not sure it's even worth keeping viewsByName as none of the related operation are performance sensitive so we could + * find the view by iterating over the CFStore.viewManager directly. + * For the lock, it could move to Keyspace too, but I don't remmenber why it has to be at the keyspace level and if it + * can be at the table level, maybe that's where it should be. */ public class ViewManager { @@@ -250,9 -148,8 +163,9 @@@ if (view == null) return; - forTable(view.getDefinition().baseTableId).removeView(name); + forTable(view.getDefinition().baseTableMetadata()).removeByName(name); SystemKeyspace.setViewRemoved(keyspace.getName(), view.name); + SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name); } public void buildAllViews() @@@ -261,22 -158,23 +174,23 @@@ view.build(); } - public ForStore forTable(UUID baseId) + public TableViews forTable(CFMetaData metadata) { - ForStore forStore = viewManagersByStore.get(baseId); - if (forStore == null) + UUID baseId = metadata.cfId; + TableViews views = viewsByBaseTable.get(baseId); + if (views == null) { - forStore = new ForStore(); - ForStore previous = viewManagersByStore.put(baseId, forStore); + views = new TableViews(metadata); + TableViews previous = viewsByBaseTable.putIfAbsent(baseId, views); if (previous != null) - forStore = previous; + views = previous; } - return forStore; + return views; } - public static Lock acquireLockFor(ByteBuffer key) + public static Lock acquireLockFor(int keyAndCfidHash) { - Lock lock = LOCKS.get(key); + Lock lock = LOCKS.get(keyAndCfidHash); if (lock.tryLock()) return lock; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 0000000,af025cb..4c6dbb7 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@@ -1,0 -1,549 +1,549 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.db.view; + + import java.nio.ByteBuffer; + import java.util.*; + + import com.google.common.collect.Iterators; + import com.google.common.collect.PeekingIterator; + + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.config.ColumnDefinition; + import org.apache.cassandra.config.ViewDefinition; + import org.apache.cassandra.db.*; + import org.apache.cassandra.db.rows.*; + import org.apache.cassandra.db.partitions.*; + import org.apache.cassandra.db.marshal.AbstractType; + import org.apache.cassandra.db.marshal.CompositeType; + + /** + * Creates the updates to apply to a view given the existing rows in the base + * table and the updates that we're applying to them (this handles updates + * on a single partition only). + * + * This class is used by passing the updates made to the base table to + * {@link #addBaseTableUpdate} and calling {@link #generateViewUpdates} once all updates have + * been handled to get the resulting view mutations. + */ + public class ViewUpdateGenerator + { + private final View view; + private final int nowInSec; + + private final CFMetaData baseMetadata; + private final DecoratedKey baseDecoratedKey; + private final ByteBuffer[] basePartitionKey; + + private final CFMetaData viewMetadata; + + private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>(); + + // Reused internally to build a new entry + private final ByteBuffer[] currentViewEntryPartitionKey; + private final Row.Builder currentViewEntryBuilder; + + /** + * The type of type update action to perform to the view for a given base table + * update. + */ + private enum UpdateAction + { + NONE, // There was no view entry and none should be added + NEW_ENTRY, // There was no entry but there is one post-update + DELETE_OLD, // There was an entry but there is nothing after update + UPDATE_EXISTING, // There was an entry and the update modifies it + SWITCH_ENTRY // There was an entry and there is still one after update, + // but they are not the same one. + }; + + /** + * Creates a new {@code ViewUpdateBuilder}. + * + * @param view the view for which this will be building updates for. + * @param basePartitionKey the partition key for the base table partition for which + * we'll handle updates for. + * @param nowInSec the current time in seconds. Used to decide if data are live or not + * and as base reference for new deletions. + */ + public ViewUpdateGenerator(View view, DecoratedKey basePartitionKey, int nowInSec) + { + this.view = view; + this.nowInSec = nowInSec; + + this.baseMetadata = view.getDefinition().baseTableMetadata(); + this.baseDecoratedKey = basePartitionKey; + this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator()); + + this.viewMetadata = view.getDefinition().metadata; + + this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()]; + this.currentViewEntryBuilder = BTreeRow.sortedBuilder(); + } + + private static ByteBuffer[] extractKeyComponents(DecoratedKey partitionKey, AbstractType<?> type) + { + return type instanceof CompositeType + ? ((CompositeType)type).split(partitionKey.getKey()) + : new ByteBuffer[]{ partitionKey.getKey() }; + } + + /** + * Adds to this generator the updates to be made to the view given a base table row + * before and after an update. + * + * @param existingBaseRow the base table row as it is before an update. + * @param mergedBaseRow the base table row after the update is applied (note that + * this is not just the new update, but rather the resulting row). + */ + public void addBaseTableUpdate(Row existingBaseRow, Row mergedBaseRow) + { + switch (updateAction(existingBaseRow, mergedBaseRow)) + { + case NONE: + return; + case NEW_ENTRY: + createEntry(mergedBaseRow); + return; + case DELETE_OLD: + deleteOldEntry(existingBaseRow); + return; + case UPDATE_EXISTING: + updateEntry(existingBaseRow, mergedBaseRow); + return; + case SWITCH_ENTRY: + createEntry(mergedBaseRow); + deleteOldEntry(existingBaseRow); + return; + } + } + + /** + * Returns the updates that needs to be done to the view given the base table updates + * passed to {@link #generateViewMutations}. + * + * @return the updates to do to the view. + */ + public Collection<PartitionUpdate> generateViewUpdates() + { + return updates.values(); + } + + /** + * Compute which type of action needs to be performed to the view for a base table row + * before and after an update. + */ + private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow) + { + // Having existing empty is useful, it just means we'll insert a brand new entry for mergedBaseRow, + // but if we have no update at all, we shouldn't get there. + assert !mergedBaseRow.isEmpty(); + + // Note that none of the base PK columns will differ since we're intrinsically dealing + // with the same base row. So we have to check 3 things: + // 1) that the clustering doesn't have a null, which can happen for compact tables. If that's the case, + // there is no corresponding entries. + // 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update. + // 3) whether mergedBaseRow actually match the view SELECT filter + + if (baseMetadata.isCompactTable()) + { + Clustering clustering = mergedBaseRow.clustering(); + for (int i = 0; i < clustering.size(); i++) + { + if (clustering.get(i) == null) + return UpdateAction.NONE; + } + } + + assert view.baseNonPKColumnsInViewPK.size() <= 1 : "We currently only support one base non-PK column in the view PK"; + if (view.baseNonPKColumnsInViewPK.isEmpty()) + { + // The view entry is necessarily the same pre and post update. + + // Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations). + boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec); + boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec); + return existingHasLiveData + ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD) + : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE); + } + + ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); + assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK"; + Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn); + Cell after = mergedBaseRow.getCell(baseColumn); + + // If the update didn't modified this column, the cells will be the same object so it's worth checking + if (before == after) + return before == null ? UpdateAction.NONE : UpdateAction.UPDATE_EXISTING; + + if (!isLive(before)) + return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE; + if (!isLive(after)) + return UpdateAction.DELETE_OLD; + + return baseColumn.cellValueType().compare(before.value(), after.value()) == 0 + ? UpdateAction.UPDATE_EXISTING + : UpdateAction.SWITCH_ENTRY; + } + + private boolean matchesViewFilter(Row baseRow) + { + return view.matchesViewFilter(baseDecoratedKey, baseRow, nowInSec); + } + + private boolean isLive(Cell cell) + { + return cell != null && cell.isLive(nowInSec); + } + + /** + * Creates a view entry corresponding to the provided base row. + * <p> + * This method checks that the base row does match the view filter before applying it. + */ + private void createEntry(Row baseRow) + { + // Before create a new entry, make sure it matches the view filter + if (!matchesViewFilter(baseRow)) + return; + + startNewUpdate(baseRow); + currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(baseRow)); + currentViewEntryBuilder.addRowDeletion(baseRow.deletion()); + + for (ColumnData data : baseRow) + { + ColumnDefinition viewColumn = view.getViewColumn(data.column()); + // If that base table column is not denormalized in the view, we had nothing to do. + // Alose, if it's part of the view PK it's already been taken into account in the clustering. + if (viewColumn == null || viewColumn.isPrimaryKeyColumn()) + continue; + + addColumnData(viewColumn, data); + } + + submitUpdate(); + } + + /** + * Creates the updates to apply to the existing view entry given the base table row before + * and after the update, assuming that the update hasn't changed to which view entry the + * row correspond (that is, we know the columns composing the view PK haven't changed). + * <p> + * This method checks that the base row (before and after) does match the view filter before + * applying anything. + */ + private void updateEntry(Row existingBaseRow, Row mergedBaseRow) + { + // While we know existingBaseRow and mergedBaseRow are corresponding to the same view entry, + // they may not match the view filter. + if (!matchesViewFilter(existingBaseRow)) + { + createEntry(mergedBaseRow); + return; + } + if (!matchesViewFilter(mergedBaseRow)) + { + deleteOldEntryInternal(existingBaseRow); + return; + } + + startNewUpdate(mergedBaseRow); + + // In theory, it may be the PK liveness and row deletion hasn't been change by the update + // and we could condition the 2 additions below. In practice though, it's as fast (if not + // faster) to compute those info than to check if they have changed so we keep it simple. + currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(mergedBaseRow)); + currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion()); + + // We only add to the view update the cells from mergedBaseRow that differs from + // existingBaseRow. For that and for speed we can just cell pointer equality: if the update + // hasn't touched a cell, we know it will be the same object in existingBaseRow and + // mergedBaseRow (note that including more cells than we strictly should isn't a problem + // for correction, so even if the code change and pointer equality don't work anymore, it'll + // only a slightly inefficiency which we can fix then). + // Note: we could alternatively use Rows.diff() for this, but because it is a bit more generic + // than what we need here, it's also a bit less efficient (it allocates more in particular), + // and this might be called a lot of time for view updates. So, given that this is not a whole + // lot of code anyway, it's probably doing the diff manually. + PeekingIterator<ColumnData> existingIter = Iterators.peekingIterator(existingBaseRow.iterator()); + for (ColumnData mergedData : mergedBaseRow) + { + ColumnDefinition baseColumn = mergedData.column(); + ColumnDefinition viewColumn = view.getViewColumn(baseColumn); + // If that base table column is not denormalized in the view, we had nothing to do. + // Alose, if it's part of the view PK it's already been taken into account in the clustering. + if (viewColumn == null || viewColumn.isPrimaryKeyColumn()) + continue; + + ColumnData existingData = null; + // Find if there is data for that column in the existing row + while (existingIter.hasNext()) + { + int cmp = baseColumn.compareTo(existingIter.peek().column()); + if (cmp < 0) + break; + + ColumnData next = existingIter.next(); + if (cmp == 0) + { + existingData = next; + break; + } + } + + if (existingData == null) + { + addColumnData(viewColumn, mergedData); + continue; + } + + if (mergedData == existingData) + continue; + + if (baseColumn.isComplex()) + { + ComplexColumnData mergedComplexData = (ComplexColumnData)mergedData; + ComplexColumnData existingComplexData = (ComplexColumnData)existingData; + if (mergedComplexData.complexDeletion().supersedes(existingComplexData.complexDeletion())) + currentViewEntryBuilder.addComplexDeletion(viewColumn, mergedComplexData.complexDeletion()); + + PeekingIterator<Cell> existingCells = Iterators.peekingIterator(existingComplexData.iterator()); + for (Cell mergedCell : mergedComplexData) + { + Cell existingCell = null; + // Find if there is corresponding cell in the existing row + while (existingCells.hasNext()) + { + int cmp = baseColumn.cellPathComparator().compare(mergedCell.path(), existingCells.peek().path()); + if (cmp > 0) + break; + + Cell next = existingCells.next(); + if (cmp == 0) + { + existingCell = next; + break; + } + } + + if (mergedCell != existingCell) + addCell(viewColumn, mergedCell); + } + } + else + { + // Note that we've already eliminated the case where merged == existing + addCell(viewColumn, (Cell)mergedData); + } + } + + submitUpdate(); + } + + /** + * Deletes the view entry corresponding to the provided base row. + * <p> + * This method checks that the base row does match the view filter before bothering. + */ + private void deleteOldEntry(Row existingBaseRow) + { + // Before deleting an old entry, make sure it was matching the view filter (otherwise there is nothing to delete) + if (!matchesViewFilter(existingBaseRow)) + return; + + deleteOldEntryInternal(existingBaseRow); + } + + private void deleteOldEntryInternal(Row existingBaseRow) + { + startNewUpdate(existingBaseRow); + DeletionTime dt = new DeletionTime(computeTimestampForEntryDeletion(existingBaseRow), nowInSec); + currentViewEntryBuilder.addRowDeletion(Row.Deletion.shadowable(dt)); + submitUpdate(); + } + + /** + * Computes the partition key and clustering for a new view entry, and setup the internal + * row builder for the new row. + * + * This assumes that there is corresponding entry, i.e. no values for the partition key and + * clustering are null (since we have eliminated that case through updateAction). + */ + private void startNewUpdate(Row baseRow) + { + ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()]; + for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns()) + { + ColumnDefinition baseColumn = view.getBaseColumn(viewColumn); + ByteBuffer value = getValueForPK(baseColumn, baseRow); + if (viewColumn.isPartitionKey()) + currentViewEntryPartitionKey[viewColumn.position()] = value; + else + clusteringValues[viewColumn.position()] = value; + } + - currentViewEntryBuilder.newRow(new Clustering(clusteringValues)); ++ currentViewEntryBuilder.newRow(Clustering.make(clusteringValues)); + } + + private LivenessInfo computeLivenessInfoForEntry(Row baseRow) + { + /* + * We need to compute both the timestamp and expiration. + * + * For the timestamp, it makes sense to use the bigger timestamp for all view PK columns. + * + * This is more complex for the expiration. We want to maintain consistency between the base and the view, so the + * entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part + * of the view PK. + * Which means we really have 2 cases: + * 1) either the columns for the base and view PKs are exactly the same: in that case, the view entry should live + * as long as the base row lives. This means the view entry should only expire once *everything* in the base row + * has expired. Which means the row TTL should be the max of any other TTL. + * 2) or there is a column that is not in the base PK but is in the view PK (we can only have one so far, we'll need + * to slightly adapt if we allow more later): in that case, as long as that column lives the entry does too, but + * as soon as it expires (or is deleted for that matter) the entry also should expire. So the expiration for the + * view is the one of that column, irregarding of any other expiration. + * To take an example of that case, if you have: + * CREATE TABLE t (a int, b int, c int, PRIMARY KEY (a, b)) + * CREATE MATERIALIZED VIEW mv AS SELECT * FROM t WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b) + * INSERT INTO t(a, b) VALUES (0, 0) USING TTL 3; + * UPDATE t SET c = 0 WHERE a = 0 AND b = 0; + * then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so + * the MV should still have a corresponding entry. + */ + assert view.baseNonPKColumnsInViewPK.size() <= 1; // This may change, but is currently an enforced limitation + + LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo(); + + if (view.baseNonPKColumnsInViewPK.isEmpty()) + { + int ttl = baseLiveness.ttl(); + int expirationTime = baseLiveness.localExpirationTime(); + for (Cell cell : baseRow.cells()) + { + if (cell.ttl() > ttl) + { + ttl = cell.ttl(); + expirationTime = cell.localDeletionTime(); + } + } + return ttl == baseLiveness.ttl() + ? baseLiveness - : LivenessInfo.create(baseLiveness.timestamp(), ttl, expirationTime); ++ : LivenessInfo.withExpirationTime(baseLiveness.timestamp(), ttl, expirationTime); + } + + ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); + Cell cell = baseRow.getCell(baseColumn); + assert isLive(cell) : "We shouldn't have got there is the base row had no associated entry"; + + long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp()); - return LivenessInfo.create(timestamp, cell.ttl(), cell.localDeletionTime()); ++ return LivenessInfo.withExpirationTime(timestamp, cell.ttl(), cell.localDeletionTime()); + } + + private long computeTimestampForEntryDeletion(Row baseRow) + { + // We delete the old row with it's row entry timestamp using a shadowable deletion. + // We must make sure that the deletion deletes everything in the entry (or the entry will + // still show up), so we must use the bigger timestamp found in the existing row (for any + // column included in the view at least). + // TODO: We have a problem though: if the entry is "resurected" by a later update, we would + // need to ensure that the timestamp for then entry then is bigger than the tombstone + // we're just inserting, which is not currently guaranteed. + // This is a bug for a separate ticket though. + long timestamp = baseRow.primaryKeyLivenessInfo().timestamp(); + for (ColumnData data : baseRow) + { + if (!view.getDefinition().includes(data.column().name)) + continue; + + timestamp = Math.max(timestamp, data.maxTimestamp()); + } + return timestamp; + } + + private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData) + { + assert viewColumn.isComplex() == baseTableData.column().isComplex(); + if (!viewColumn.isComplex()) + { + addCell(viewColumn, (Cell)baseTableData); + return; + } + + ComplexColumnData complexData = (ComplexColumnData)baseTableData; + currentViewEntryBuilder.addComplexDeletion(viewColumn, complexData.complexDeletion()); + for (Cell cell : complexData) + addCell(viewColumn, cell); + } + + private void addCell(ColumnDefinition viewColumn, Cell baseTableCell) + { + assert !viewColumn.isPrimaryKeyColumn(); + currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn)); + } + + /** + * Finish building the currently updated view entry and add it to the other built + * updates. + */ + private void submitUpdate() + { + Row row = currentViewEntryBuilder.build(); + // I'm not sure we can reach there is there is nothing is updated, but adding an empty row breaks things + // and it costs us nothing to be prudent here. + if (row.isEmpty()) + return; + + DecoratedKey partitionKey = makeCurrentPartitionKey(); + PartitionUpdate update = updates.get(partitionKey); + if (update == null) + { + // We can't really know which columns of the view will be updated nor how many row will be updated for this key + // so we rely on hopefully sane defaults. + update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4); + updates.put(partitionKey, update); + } + update.add(row); + } + + private DecoratedKey makeCurrentPartitionKey() + { + ByteBuffer rawKey = viewMetadata.partitionKeyColumns().size() == 1 + ? currentViewEntryPartitionKey[0] + : CompositeType.build(currentViewEntryPartitionKey); + + return viewMetadata.decorateKey(rawKey); + } + + private ByteBuffer getValueForPK(ColumnDefinition column, Row row) + { + switch (column.kind) + { + case PARTITION_KEY: + return basePartitionKey[column.position()]; + case CLUSTERING: + return row.clustering().get(column.position()); + default: + // This shouldn't NPE as we shouldn't get there if the value can be null (or there is a bug in updateAction()) + return row.getCell(column).value(); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/test/unit/org/apache/cassandra/db/rows/RowsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/rows/RowsTest.java index 00ab6ca,b47bea2..ba03478 --- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@@ -235,11 -235,12 +235,12 @@@ public class RowsTes originalBuilder.addPrimaryKeyLivenessInfo(liveness); DeletionTime complexDeletion = new DeletionTime(ts-1, now); originalBuilder.addComplexDeletion(m, complexDeletion); - List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, secondToTs(now), BB1), - BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)), - BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2))); + List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(v, secondToTs(now), BB1), + BufferCell.live(m, secondToTs(now), BB1, CellPath.create(BB1)), + BufferCell.live(m, secondToTs(now), BB2, CellPath.create(BB2))); expectedCells.forEach(originalBuilder::addCell); - Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false); + // We need to use ts-1 so the deletion doesn't shadow what we've created + Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false); originalBuilder.addRowDeletion(rowDeletion); RowBuilder builder = new RowBuilder(); @@@ -263,11 -264,12 +264,12 @@@ builder.addPrimaryKeyLivenessInfo(liveness); DeletionTime complexDeletion = new DeletionTime(ts-1, now); builder.addComplexDeletion(m, complexDeletion); - List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, ts, BB1), - BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)), - BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2))); + List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(v, ts, BB1), + BufferCell.live(m, ts, BB1, CellPath.create(BB1)), + BufferCell.live(m, ts, BB2, CellPath.create(BB2))); expectedCells.forEach(builder::addCell); - Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false); + // We need to use ts-1 so the deletion doesn't shadow what we've created + Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false); builder.addRowDeletion(rowDeletion); StatsCollector collector = new StatsCollector();