http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java new file mode 100644 index 0000000..2ddc6ca --- /dev/null +++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +public abstract class AbstractReadCommandBuilder +{ + protected final ColumnFamilyStore cfs; + protected int nowInSeconds; + + private int cqlLimit = -1; + private int pagingLimit = -1; + protected boolean reversed = false; + + protected Set<ColumnIdentifier> columns; + protected final RowFilter filter = RowFilter.create(); + + private Slice.Bound lowerClusteringBound; + private Slice.Bound upperClusteringBound; + + private NavigableSet<Clustering> clusterings; + + // Use Util.cmd() instead of this ctor directly + AbstractReadCommandBuilder(ColumnFamilyStore cfs) + { + this.cfs = cfs; + this.nowInSeconds = FBUtilities.nowInSeconds(); + } + + public AbstractReadCommandBuilder withNowInSeconds(int nowInSec) + { + this.nowInSeconds = nowInSec; + return this; + } + + public AbstractReadCommandBuilder fromIncl(Object... values) + { + assert lowerClusteringBound == null && clusterings == null; + this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values); + return this; + } + + public AbstractReadCommandBuilder fromExcl(Object... values) + { + assert lowerClusteringBound == null && clusterings == null; + this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values); + return this; + } + + public AbstractReadCommandBuilder toIncl(Object... values) + { + assert upperClusteringBound == null && clusterings == null; + this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values); + return this; + } + + public AbstractReadCommandBuilder toExcl(Object... values) + { + assert upperClusteringBound == null && clusterings == null; + this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values); + return this; + } + + public AbstractReadCommandBuilder includeRow(Object... values) + { + assert lowerClusteringBound == null && upperClusteringBound == null; + + if (this.clusterings == null) + this.clusterings = new TreeSet<>(cfs.metadata.comparator); + + this.clusterings.add(cfs.metadata.comparator.make(values)); + return this; + } + + public AbstractReadCommandBuilder reverse() + { + this.reversed = true; + return this; + } + + public AbstractReadCommandBuilder withLimit(int newLimit) + { + this.cqlLimit = newLimit; + return this; + } + + public AbstractReadCommandBuilder withPagingLimit(int newLimit) + { + this.pagingLimit = newLimit; + return this; + } + + public AbstractReadCommandBuilder columns(String... columns) + { + if (this.columns == null) + this.columns = new HashSet<>(); + + for (String column : columns) + this.columns.add(ColumnIdentifier.getInterned(column, true)); + return this; + } + + private ByteBuffer bb(Object value, AbstractType<?> type) + { + return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value); + } + + private AbstractType<?> forValues(AbstractType<?> collectionType) + { + assert collectionType instanceof CollectionType; + CollectionType ct = (CollectionType)collectionType; + switch (ct.kind) + { + case LIST: + case MAP: + return ct.valueComparator(); + case SET: + return ct.nameComparator(); + } + throw new AssertionError(); + } + + private AbstractType<?> forKeys(AbstractType<?> collectionType) + { + assert collectionType instanceof CollectionType; + CollectionType ct = (CollectionType)collectionType; + switch (ct.kind) + { + case LIST: + case MAP: + return ct.nameComparator(); + } + throw new AssertionError(); + } + + public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value) + { + ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true)); + assert def != null; + + AbstractType<?> type = def.type; + if (op == Operator.CONTAINS) + type = forValues(type); + else if (op == Operator.CONTAINS_KEY) + type = forKeys(type); + + this.filter.add(def, op, bb(value, type)); + return this; + } + + protected ColumnFilter makeColumnFilter() + { + if (columns == null || columns.isEmpty()) + return ColumnFilter.all(cfs.metadata); + + ColumnFilter.Builder filter = ColumnFilter.selectionBuilder(); + for (ColumnIdentifier column : columns) + filter.add(cfs.metadata.getColumnDefinition(column)); + return filter.build(); + } + + protected ClusteringIndexFilter makeFilter() + { + if (clusterings != null) + { + return new ClusteringIndexNamesFilter(clusterings, reversed); + } + else + { + Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound, + upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound); + return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed); + } + } + + protected DataLimits makeLimits() + { + DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit); + if (pagingLimit >= 0) + limits = limits.forPaging(pagingLimit); + return limits; + } + + public abstract ReadCommand build(); + + public static class SinglePartitionBuilder extends AbstractReadCommandBuilder + { + private final DecoratedKey partitionKey; + + public SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key) + { + super(cfs); + this.partitionKey = key; + } + + @Override + public ReadCommand build() + { + return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter()); + } + } + + public static class SinglePartitionSliceBuilder extends AbstractReadCommandBuilder + { + private final DecoratedKey partitionKey; + private Slices.Builder sliceBuilder; + + public SinglePartitionSliceBuilder(ColumnFamilyStore cfs, DecoratedKey key) + { + super(cfs); + this.partitionKey = key; + sliceBuilder = new Slices.Builder(cfs.getComparator()); + } + + public SinglePartitionSliceBuilder addSlice(Slice slice) + { + sliceBuilder.add(slice); + return this; + } + + @Override + protected ClusteringIndexFilter makeFilter() + { + return new ClusteringIndexSliceFilter(sliceBuilder.build(), reversed); + } + + @Override + public ReadCommand build() + { + return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter()); + } + } + + public static class PartitionRangeBuilder extends AbstractReadCommandBuilder + { + private DecoratedKey startKey; + private boolean startInclusive; + private DecoratedKey endKey; + private boolean endInclusive; + + public PartitionRangeBuilder(ColumnFamilyStore cfs) + { + super(cfs); + } + + public PartitionRangeBuilder fromKeyIncl(Object... values) + { + assert startKey == null; + this.startInclusive = true; + this.startKey = makeKey(cfs.metadata, values); + return this; + } + + public PartitionRangeBuilder fromKeyExcl(Object... values) + { + assert startKey == null; + this.startInclusive = false; + this.startKey = makeKey(cfs.metadata, values); + return this; + } + + public PartitionRangeBuilder toKeyIncl(Object... values) + { + assert endKey == null; + this.endInclusive = true; + this.endKey = makeKey(cfs.metadata, values); + return this; + } + + public PartitionRangeBuilder toKeyExcl(Object... values) + { + assert endKey == null; + this.endInclusive = false; + this.endKey = makeKey(cfs.metadata, values); + return this; + } + + @Override + public ReadCommand build() + { + PartitionPosition start = startKey; + if (start == null) + { + start = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); + startInclusive = false; + } + PartitionPosition end = endKey; + if (end == null) + { + end = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); + endInclusive = true; + } + + AbstractBounds<PartitionPosition> bounds; + if (startInclusive && endInclusive) + bounds = new Bounds<>(start, end); + else if (startInclusive && !endInclusive) + bounds = new IncludingExcludingBounds<>(start, end); + else if (!startInclusive && endInclusive) + bounds = new Range<>(start, end); + else + bounds = new ExcludingBounds<>(start, end); + + return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); + } + + static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) + { + if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey) + return (DecoratedKey)partitionKey[0]; + + ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); + return StorageService.getPartitioner().decorateKey(key); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 6ac132c..24da365 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -53,6 +53,7 @@ import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.view.MaterializedViewManager; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; @@ -159,6 +160,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private final AtomicInteger fileIndexGenerator = new AtomicInteger(0); public final SecondaryIndexManager indexManager; + public final MaterializedViewManager materializedViewManager; /* These are locally held copies to be changed from the config during runtime */ private volatile DefaultInteger minCompactionThreshold; @@ -195,6 +197,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean indexManager.reload(); + materializedViewManager.reload(); // If the CF comparator has changed, we need to change the memtable, // because the old one still aliases the previous comparator. if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator) @@ -331,6 +334,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean this.partitioner = partitioner; this.directories = directories; this.indexManager = new SecondaryIndexManager(this); + this.materializedViewManager = new MaterializedViewManager(this); this.metric = new TableMetrics(this); fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; @@ -451,6 +455,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SystemKeyspace.removeTruncationRecord(metadata.cfId); data.dropSSTables(); indexManager.invalidate(); + materializedViewManager.invalidate(); invalidateCaches(); } @@ -572,8 +577,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // must be called after all sstables are loaded since row cache merges all row versions - public void initRowCache() + public void init() { + materializedViewManager.init(); + if (!isRowCacheEnabled()) return; @@ -1806,7 +1813,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean cfs.data.reset(); return null; } - }, true); + }, true, false); } } @@ -1834,19 +1841,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // flush the CF being truncated before forcing the new segment forceBlockingFlush(); + materializedViewManager.forceBlockingFlush(); + // sleep a little to make sure that our truncatedAt comes after any sstable // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); } else { - // just nuke the memtable data w/o writing to disk first - synchronized (data) - { - final Flush flush = new Flush(true); - flushExecutor.execute(flush); - postFlushExecutor.submit(flush.postFlush); - } + dumpMemtable(); + materializedViewManager.dumpMemtables(); } Runnable truncateRunnable = new Runnable() @@ -1866,17 +1870,32 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (SecondaryIndex index : indexManager.getIndexes()) index.truncateBlocking(truncatedAt); + materializedViewManager.truncateBlocking(truncatedAt); + SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); logger.debug("cleaning out row cache"); invalidateCaches(); } }; - runWithCompactionsDisabled(Executors.callable(truncateRunnable), true); + runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); logger.debug("truncate complete"); } - public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation) + /** + * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable. + */ + public void dumpMemtable() + { + synchronized (data) + { + final Flush flush = new Flush(true); + flushExecutor.execute(flush); + postFlushExecutor.submit(flush.postFlush); + } + } + + public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews) { // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, // and so we only run one major compaction at a time @@ -1884,17 +1903,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { logger.debug("Cancelling in-progress compactions for {}", metadata.cfName); - Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes(); - for (ColumnFamilyStore cfs : selfWithIndexes) + Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews + ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs()) + : concatWithIndexes(); + + for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) cfs.getCompactionStrategyManager().pause(); try { // interrupt in-progress compactions - CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation); - CompactionManager.instance.waitForCessation(selfWithIndexes); + CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation); + CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs); // doublecheck that we finished, instead of timing out - for (ColumnFamilyStore cfs : selfWithIndexes) + for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) { if (!cfs.getTracker().getCompacting().isEmpty()) { @@ -1916,7 +1938,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } finally { - for (ColumnFamilyStore cfs : selfWithIndexes) + for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) cfs.getCompactionStrategyManager().resume(); } } @@ -1936,7 +1958,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } }; - return runWithCompactionsDisabled(callable, false); + return runWithCompactionsDisabled(callable, false, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index f37ce66..78b593b 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -23,13 +23,15 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.commitlog.CommitLog; @@ -37,14 +39,17 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.view.MaterializedViewManager; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.metrics.KeyspaceMetrics; @@ -70,7 +75,10 @@ public class Keyspace } private volatile KeyspaceMetadata metadata; - public final OpOrder writeOrder = new OpOrder(); + + //OpOrder is defined globally since we need to order writes across + //Keyspaces in the case of MaterializedViews (batchlog of MV mutations) + public static final OpOrder writeOrder = new OpOrder(); /* ColumnFamilyStore per column family */ private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>(); @@ -122,7 +130,7 @@ public class Keyspace // keyspace has to be constructed and in the cache before cacheRow can be called for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores()) - cfs.initRowCache(); + cfs.init(); } } } @@ -352,10 +360,14 @@ public class Keyspace // CFS being created for the first time, either on server startup or new CF being added. // We don't worry about races here; startup is safe, and adding multiple idential CFs // simultaneously is a "don't do that" scenario. - ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables)); + ColumnFamilyStore newCfs = ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables); + + ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, newCfs); // CFS mbean instantiation will error out before we hit this, but in case that changes... if (oldCfs != null) throw new IllegalStateException("added multiple mappings for cf id " + cfId); + + newCfs.init(); } else { @@ -380,11 +392,41 @@ public class Keyspace * @param writeCommitLog false to disable commitlog append entirely * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") */ - public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) + public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes) { if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); + Lock lock = null; + boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false); + + if (requiresViewUpdate) + { + lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey()); + + if (lock == null) + { + if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + { + logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); + Tracing.trace("Could not acquire MV lock"); + throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } + else + { + //This MV update can't happen right now. so rather than keep this thread busy + // we will re-apply ourself to the queue and try again later + StageManager.getStage(Stage.MUTATION).execute(() -> { + if (writeCommitLog) + mutation.apply(); + else + mutation.applyUnsafe(); + }); + + return; + } + } + } int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group opGroup = writeOrder.start()) { @@ -401,10 +443,26 @@ public class Keyspace ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId); if (cfs == null) { - logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId); + logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName); continue; } + if (requiresViewUpdate) + { + try + { + Tracing.trace("Create materialized view mutations from replica"); + cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd); + } + catch (Exception e) + { + if (!(e instanceof WriteTimeoutException)) + logger.warn("Encountered exception when creating materialized view mutations", e); + + JVMStabilityInspector.inspectThrowable(e); + } + } + Tracing.trace("Adding to {} memtable", upd.metadata().cfName); SecondaryIndexManager.Updater updater = updateIndexes ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec) @@ -412,6 +470,11 @@ public class Keyspace cfs.apply(upd, updater, opGroup, replayPosition); } } + finally + { + if (lock != null) + lock.unlock(); + } } public AbstractReplicationStrategy getReplicationStrategy() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 3d49ca6..ace114b 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -54,6 +54,8 @@ public class Mutation implements IMutation // map of column family id to mutations for that column family. private final Map<UUID, PartitionUpdate> modifications; + // Time at which this mutation was instantiated + public final long createdAt = System.currentTimeMillis(); public Mutation(String keyspaceName, DecoratedKey key) { this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 3baa93e..640e45f 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -22,6 +22,7 @@ import java.io.IOError; import java.io.IOException; import java.net.InetAddress; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; @@ -47,10 +48,17 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> replyTo = InetAddress.getByAddress(from); } + try + { message.payload.apply(); WriteResponse response = new WriteResponse(); Tracing.trace("Enqueuing response to {}", replyTo); MessagingService.instance().sendReply(response.createMessage(), id, replyTo); + } + catch (WriteTimeoutException wto) + { + Tracing.trace("Payload application resulted in WriteTimeout, not replying"); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 7bfd552..7ea946b 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -101,6 +101,8 @@ public final class SystemKeyspace public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; + public static final String MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS = "materializedviews_builds_in_progress"; + public static final String BUILT_MATERIALIZEDVIEWS = "built_materializedviews"; @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces"; @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies"; @@ -261,6 +263,24 @@ public final class SystemKeyspace + "ranges set<blob>," + "PRIMARY KEY ((keyspace_name)))"); + public static final CFMetaData MaterializedViewsBuildsInProgress = + compile(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS, + "materialized views builds current progress", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "last_token varchar," + + "generation_number int," + + "PRIMARY KEY ((keyspace_name), view_name))"); + + public static final CFMetaData BuiltMaterializedViews = + compile(BUILT_MATERIALIZEDVIEWS, + "built materialized views", + "CREATE TABLE \"%s\" (" + + "keyspace_name text," + + "view_name text," + + "PRIMARY KEY ((keyspace_name), view_name))"); + @Deprecated public static final CFMetaData LegacyKeyspaces = compile(LEGACY_KEYSPACES, @@ -401,6 +421,8 @@ public final class SystemKeyspace SSTableActivity, SizeEstimates, AvailableRanges, + MaterializedViewsBuildsInProgress, + BuiltMaterializedViews, LegacyKeyspaces, LegacyColumnfamilies, LegacyColumns, @@ -493,6 +515,82 @@ public final class SystemKeyspace return CompactionHistoryTabularData.from(queryResultSet); } + public static boolean isViewBuilt(String keyspaceName, String viewName) + { + String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?"; + UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName); + return !result.isEmpty(); + } + + public static void setMaterializedViewBuilt(String keyspaceName, String viewName) + { + String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)"; + executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName); + forceBlockingFlush(BUILT_MATERIALIZEDVIEWS); + } + + + public static void setMaterializedViewRemoved(String keyspaceName, String viewName) + { + String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?"; + executeInternal(String.format(buildReq, NAME, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName); + forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS); + + String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?"; + executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName); + forceBlockingFlush(BUILT_MATERIALIZEDVIEWS); + } + + public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber) + { + executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), + ksname, + viewName, + generationNumber); + } + + public static void finishMaterializedViewBuildStatus(String ksname, String viewName) + { + // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed + // materialized view build. + // If we flush the delete first, we'll have to restart from the beginning. + // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the + // materialized view build check next boot. + setMaterializedViewBuilt(ksname, viewName); + forceBlockingFlush(BUILT_MATERIALIZEDVIEWS); + executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName); + forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS); + } + + public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token) + { + String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); + } + + public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName) + { + String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?"; + UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName); + if (queryResultSet == null || queryResultSet.isEmpty()) + return null; + + UntypedResultSet.Row row = queryResultSet.one(); + + Integer generation = null; + Token lastKey = null; + if (row.has("generation_number")) + generation = row.getInt("generation_number"); + if (row.has("last_key")) + { + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + lastKey = factory.fromString(row.getString("last_key")); + } + + return Pair.create(generation, lastKey); + } + public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/WriteType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java index 4f4c88d..20fb6a9 100644 --- a/src/java/org/apache/cassandra/db/WriteType.java +++ b/src/java/org/apache/cassandra/db/WriteType.java @@ -24,5 +24,6 @@ public enum WriteType UNLOGGED_BATCH, COUNTER, BATCH_LOG, - CAS; + CAS, + MATERIALIZED_VIEW; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index bf412d8..3dd6f38 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -58,6 +58,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.SecondaryIndexBuilder; +import org.apache.cassandra.db.view.MaterializedViewBuilder; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; @@ -1365,6 +1366,31 @@ public class CompactionManager implements CompactionManagerMBean } } + public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder) + { + Runnable runnable = new Runnable() + { + public void run() + { + metrics.beginCompaction(builder); + try + { + builder.run(); + } + finally + { + metrics.finishCompaction(builder); + } + } + }; + if (executor.isShutdown()) + { + logger.info("Compaction executor has shut down, not submitting index build"); + return null; + } + + return executor.submit(runnable); + } public int getActiveCompactions() { return CompactionMetrics.getCompactions().size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 766eb1b..5e15f33 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -426,7 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer return tasks; } } - }, false); + }, false, false); } public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index 5b6ce05..f8f016c 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -35,7 +35,8 @@ public enum OperationType VERIFY("Verify"), FLUSH("Flush"), STREAM("Stream"), - WRITE("Write"); + WRITE("Write"), + VIEW_BUILD("Materialized view build"); public final String type; public final String fileName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedView.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java new file mode 100644 index 0000000..082c71d --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java @@ -0,0 +1,691 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.view; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.MaterializedViewDefinition; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.statements.CFProperties; +import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder; +import org.apache.cassandra.db.CBuilder; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeBackedRow; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.ColumnData; +import org.apache.cassandra.db.rows.ComplexColumnData; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.service.pager.QueryPager; + +/** + * A Materialized View copies data from a base table into a view table which can be queried independently from the + * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure + * that if a view needs to be updated, the updates are properly created and fed into the view. + * + * This class does the job of translating the base row to the view row. + * + * It handles reading existing state and figuring out what tombstones need to be generated. + * + * createMutations below is the "main method" + * + */ +public class MaterializedView +{ + /** + * The columns should all be updated together, so we use this object as group. + */ + private static class MVColumns + { + //These are the base column definitions in terms of the *views* partitioning. + //Meaning we can see (for example) the partition key of the view contains a clustering key + //from the base table. + public final List<ColumnDefinition> partitionDefs; + public final List<ColumnDefinition> primaryKeyDefs; + public final List<ColumnDefinition> baseComplexColumns; + + private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns) + { + this.partitionDefs = partitionDefs; + this.primaryKeyDefs = primaryKeyDefs; + this.baseComplexColumns = baseComplexColumns; + } + } + + public final String name; + + private final ColumnFamilyStore baseCfs; + private ColumnFamilyStore _viewCfs = null; + + private MVColumns columns; + + private final boolean viewHasAllPrimaryKeys; + private final boolean includeAll; + private MaterializedViewBuilder builder; + + public MaterializedView(MaterializedViewDefinition definition, + ColumnFamilyStore baseCfs) + { + this.baseCfs = baseCfs; + + name = definition.viewName; + includeAll = definition.includeAll; + + viewHasAllPrimaryKeys = updateDefinition(definition); + } + + /** + * Lazily fetch the CFS instance for the view. + * We do this lazily to avoid initilization issues. + * + * @return The views CFS instance + */ + public ColumnFamilyStore getViewCfs() + { + if (_viewCfs == null) + _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name)); + + return _viewCfs; + } + + + /** + * Lookup column definitions in the base table that correspond to the view columns (should be 1:1) + * + * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify + * tombstone checks. + * + * @param columns a list of columns to lookup in the base table + * @param definitions lists to populate for the base table definitions + * @return true if all view PKs are also Base PKs + */ + private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions) + { + boolean allArePrimaryKeys = true; + for (ColumnIdentifier identifier : columns) + { + ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier); + assert cdef != null : "Could not resolve column " + identifier.toString(); + + for (List<ColumnDefinition> list : definitions) + { + list.add(cdef); + } + + allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn(); + } + + return allArePrimaryKeys; + } + + /** + * This updates the columns stored which are dependent on the base CFMetaData. + * + * @return true if the view contains only columns which are part of the base's primary key; false if there is at + * least one column which is not. + */ + public boolean updateDefinition(MaterializedViewDefinition definition) + { + List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size()); + List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size() + + definition.clusteringColumns.size()); + List<ColumnDefinition> baseComplexColumns = new ArrayList<>(); + + // We only add the partition columns to the partitions list, but both partition columns and clustering + // columns are added to the primary keys list + boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs); + boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs); + + for (ColumnDefinition cdef : baseCfs.metadata.allColumns()) + { + if (cdef.isComplex()) + { + baseComplexColumns.add(cdef); + } + } + + this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns); + + return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns; + } + + /** + * Check to see if the update could possibly modify a view. Cases where the view may be updated are: + * <ul> + * <li>View selects all columns</li> + * <li>Update contains any range tombstones</li> + * <li>Update touches one of the columns included in the view</li> + * </ul> + * + * If the update contains any range tombstones, there is a possibility that it will not touch a range that is + * currently included in the view. + * + * @return true if {@param partition} modifies a column included in the view + */ + public boolean updateAffectsView(AbstractThreadUnsafePartition partition) + { + // If we are including all of the columns, then any update will be included + if (includeAll) + return true; + + // If there are range tombstones, tombstones will also need to be generated for the materialized view + // This requires a query of the base rows and generating tombstones for all of those values + if (!partition.deletionInfo().isLive()) + return true; + + // Check whether the update touches any of the columns included in the view + for (Row row : partition) + { + for (ColumnData data : row) + { + if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null) + return true; + } + } + + return false; + } + + /** + * Creates the clustering columns for the view based on the specified row and resolver policy + * + * @param temporalRow The current row + * @param resolver The policy to use when selecting versions of cells use + * @return The clustering object to use for the view + */ + private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver) + { + CFMetaData viewCfm = getViewCfs().metadata; + int numViewClustering = viewCfm.clusteringColumns().size(); + CBuilder clustering = CBuilder.create(getViewCfs().getComparator()); + for (int i = 0; i < numViewClustering; i++) + { + ColumnDefinition definition = viewCfm.clusteringColumns().get(i); + clustering.add(temporalRow.clusteringValue(definition, resolver)); + } + + return clustering.build(); + } + + /** + * @return Mutation containing a range tombstone for a base partition key and TemporalRow. + */ + private PartitionUpdate createTombstone(TemporalRow temporalRow, + DecoratedKey partitionKey, + DeletionTime deletionTime, + TemporalRow.Resolver resolver, + int nowInSec) + { + CFMetaData viewCfm = getViewCfs().metadata; + Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); + builder.newRow(viewClustering(temporalRow, resolver)); + builder.addRowDeletion(deletionTime); + return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); + } + + /** + * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier. + */ + private PartitionUpdate createComplexTombstone(TemporalRow temporalRow, + DecoratedKey partitionKey, + ColumnDefinition deletedColumn, + DeletionTime deletionTime, + TemporalRow.Resolver resolver, + int nowInSec) + { + + CFMetaData viewCfm = getViewCfs().metadata; + Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); + builder.newRow(viewClustering(temporalRow, resolver)); + builder.addComplexDeletion(deletedColumn, deletionTime); + return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); + } + + /** + * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from + * the TemporalRow and its Resolver + */ + private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver) + { + List<ColumnDefinition> partitionDefs = this.columns.partitionDefs; + Object[] partitionKey = new Object[partitionDefs.size()]; + + for (int i = 0; i < partitionKey.length; i++) + { + ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver); + + if (value == null) + return null; + + partitionKey[i] = value; + } + + return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata + .getKeyValidatorAsClusteringComparator() + .make(partitionKey))); + } + + /** + * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary. + * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one + * mutation is necessary + */ + private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow) + { + // Primary Key and Clustering columns do not generate tombstones + if (viewHasAllPrimaryKeys) + return null; + + boolean hasUpdate = false; + List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs; + for (ColumnDefinition viewPartitionKeys : primaryKeyDefs) + { + if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null) + hasUpdate = true; + } + + if (!hasUpdate) + return null; + + TemporalRow.Resolver resolver = TemporalRow.earliest; + return createTombstone(temporalRow, + viewPartitionKey(temporalRow, resolver), + new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec), + resolver, + temporalRow.nowInSec); + } + + /** + * @return Mutation which is the transformed base table mutation for the materialized view. + */ + private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow) + { + TemporalRow.Resolver resolver = TemporalRow.latest; + + DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver); + ColumnFamilyStore viewCfs = getViewCfs(); + + if (partitionKey == null) + { + // Not having a partition key means we aren't updating anything + return null; + } + + Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec); + + CBuilder clustering = CBuilder.create(viewCfs.getComparator()); + for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++) + { + clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver)); + } + regularBuilder.newRow(clustering.build()); + regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata, + temporalRow.viewClusteringTimestamp(), + temporalRow.viewClusteringTtl(), + temporalRow.viewClusteringLocalDeletionTime())); + + for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns()) + { + if (columnDefinition.isPrimaryKeyColumn()) + continue; + + for (Cell cell : temporalRow.values(columnDefinition, resolver)) + { + regularBuilder.addCell(cell); + } + } + + return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build()); + } + + /** + * @param partition Update which possibly contains deletion info for which to generate view tombstones. + * @return View Tombstones which delete all of the rows which have been removed from the base table with + * {@param partition} + */ + private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition) + { + final TemporalRow.Resolver resolver = TemporalRow.earliest; + + DeletionInfo deletionInfo = partition.deletionInfo(); + + List<Mutation> mutations = new ArrayList<>(); + + // Check the complex columns to see if there are any which may have tombstones we need to create for the view + if (!columns.baseComplexColumns.isEmpty()) + { + for (Row row : partition) + { + if (!row.hasComplexDeletion()) + continue; + + TemporalRow temporalRow = rowSet.getClustering(row.clustering()); + + assert temporalRow != null; + + for (ColumnDefinition definition : columns.baseComplexColumns) + { + ComplexColumnData columnData = row.getComplexColumnData(definition); + + if (columnData != null) + { + DeletionTime time = columnData.complexDeletion(); + if (!time.isLive()) + { + DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver); + if (targetKey != null) + mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec))); + } + } + } + } + } + + ReadCommand command = null; + + if (!deletionInfo.isLive()) + { + // We have to generate tombstones for all of the affected rows, but we don't have the information in order + // to create them. This requires that we perform a read for the entire range that is being tombstoned, and + // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an + // entire partition of data which is not distributed on a single partition node. + DecoratedKey dk = rowSet.dk; + + if (deletionInfo.hasRanges()) + { + SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk); + Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false); + while (tombstones.hasNext()) + { + RangeTombstone tombstone = tombstones.next(); + + builder.addSlice(tombstone.deletedSlice()); + } + + command = builder.build(); + } + else + { + command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk); + } + } + + if (command == null) + { + SinglePartitionSliceBuilder builder = null; + for (Row row : partition) + { + if (!row.deletion().isLive()) + { + if (builder == null) + builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); + builder.addSlice(Slice.make(row.clustering())); + } + } + + if (builder != null) + command = builder.build(); + } + + if (command != null) + { + QueryPager pager = command.getPager(null); + + // Add all of the rows which were recovered from the query to the row set + while (!pager.isExhausted()) + { + try (ReadOrderGroup orderGroup = pager.startOrderGroup(); + PartitionIterator iter = pager.fetchPageInternal(128, orderGroup)) + { + if (!iter.hasNext()) + break; + + try (RowIterator rowIterator = iter.next()) + { + while (rowIterator.hasNext()) + { + Row row = rowIterator.next(); + rowSet.addRow(row, false); + } + } + } + } + + // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone + // for the view. + for (TemporalRow temporalRow : rowSet) + { + DeletionTime deletionTime = temporalRow.deletionTime(partition); + if (!deletionTime.isLive()) + { + DecoratedKey value = viewPartitionKey(temporalRow, resolver); + if (value != null) + { + PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec); + if (update != null) + mutations.add(new Mutation(update)); + } + } + } + } + + return !mutations.isEmpty() ? mutations : null; + } + + /** + * Read and update temporal rows in the set which have corresponding values stored on the local node + */ + private void readLocalRows(TemporalRow.Set rowSet) + { + SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); + + for (TemporalRow temporalRow : rowSet) + builder.addSlice(temporalRow.baseSlice()); + + QueryPager pager = builder.build().getPager(null); + + while (!pager.isExhausted()) + { + try (ReadOrderGroup orderGroup = pager.startOrderGroup(); + PartitionIterator iter = pager.fetchPageInternal(128, orderGroup)) + { + while (iter.hasNext()) + { + try (RowIterator rows = iter.next()) + { + while (rows.hasNext()) + { + rowSet.addRow(rows.next(), false); + } + } + } + } + } + } + + /** + * @return Set of rows which are contained in the partition update {@param partition} + */ + private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition) + { + Set<ColumnIdentifier> columns = new HashSet<>(); + for (ColumnDefinition def : this.columns.primaryKeyDefs) + columns.add(def.name); + + TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key); + for (Row row : partition) + rowSet.addRow(row, true); + + return rowSet; + } + + /** + * @param isBuilding If the view is currently being built, we do not query the values which are already stored, + * since all of the update will already be present in the base table. + * @return View mutations which represent the changes necessary as long as previously created mutations for the view + * have been applied successfully. This is based solely on the changes that are necessary given the current + * state of the base table and the newly applying partition data. + */ + public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding) + { + if (!updateAffectsView(partition)) + return null; + + TemporalRow.Set rowSet = separateRows(key, partition); + + // If we are building the view, we do not want to add old values; they will always be the same + if (!isBuilding) + readLocalRows(rowSet); + + Collection<Mutation> mutations = null; + for (TemporalRow temporalRow : rowSet) + { + // If we are building, there is no need to check for partition tombstones; those values will not be present + // in the partition data + if (!isBuilding) + { + PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow); + if (partitionTombstone != null) + { + if (mutations == null) mutations = new LinkedList<>(); + mutations.add(new Mutation(partitionTombstone)); + } + } + + PartitionUpdate insert = createUpdatesForInserts(temporalRow); + if (insert != null) + { + if (mutations == null) mutations = new LinkedList<>(); + mutations.add(new Mutation(insert)); + } + } + + if (!isBuilding) + { + Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition); + if (deletion != null && !deletion.isEmpty()) + { + if (mutations == null) mutations = new LinkedList<>(); + mutations.addAll(deletion); + } + } + + return mutations; + } + + public synchronized void build() + { + if (this.builder != null) + { + this.builder.stop(); + this.builder = null; + } + + this.builder = new MaterializedViewBuilder(baseCfs, this); + CompactionManager.instance.submitMaterializedViewBuilder(builder); + } + + /** + * @return CFMetaData which represents the definition given + */ + public static CFMetaData getCFMetaData(MaterializedViewDefinition definition, + CFMetaData baseCf, + CFProperties properties) + { + CFMetaData.Builder viewBuilder = CFMetaData.Builder + .createView(baseCf.ksName, definition.viewName); + + ColumnDefinition nonPkTarget = null; + + for (ColumnIdentifier targetIdentifier : definition.partitionColumns) + { + ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier); + if (!target.isPartitionKey()) + nonPkTarget = target; + + viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type)); + } + + Collection<ColumnDefinition> included = new ArrayList<>(); + for(ColumnIdentifier identifier : definition.included) + { + ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier); + assert cfDef != null; + included.add(cfDef); + } + + boolean includeAll = included.isEmpty(); + + for (ColumnIdentifier ident : definition.clusteringColumns) + { + ColumnDefinition column = baseCf.getColumnDefinition(ident); + viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type)); + } + + for (ColumnDefinition column : baseCf.partitionColumns().regulars.columns) + { + if (column != nonPkTarget && (includeAll || included.contains(column))) + { + viewBuilder.addRegularColumn(column.name, column.type); + } + } + + //Add any extra clustering columns + for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns())) + { + if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) && + (includeAll || included.contains(column)) ) + { + viewBuilder.addRegularColumn(column.name, column.type); + } + } + + CFMetaData cfm = viewBuilder.build(); + properties.properties.applyToCFMetadata(cfm); + + return cfm; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java new file mode 100644 index 0000000..e8842ed --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.pager.QueryPager; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Refs; + +public class MaterializedViewBuilder extends CompactionInfo.Holder +{ + private final ColumnFamilyStore baseCfs; + private final MaterializedView view; + private final UUID compactionId; + private volatile Token prevToken = null; + + private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class); + + private volatile boolean isStopped = false; + + public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view) + { + this.baseCfs = baseCfs; + this.view = view; + compactionId = UUIDGen.getTimeUUID(); + } + + private void buildKey(DecoratedKey key) + { + QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null); + + while (!pager.isExhausted()) + { + try (ReadOrderGroup orderGroup = pager.startOrderGroup(); + PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup)) + { + if (!partitionIterator.hasNext()) + return; + + try (RowIterator rowIterator = partitionIterator.next()) + { + Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true); + + if (mutations != null) + { + try + { + StorageProxy.mutateMV(key.getKey(), mutations); + break; + } + catch (WriteTimeoutException ex) + { + NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES) + .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name); + } + } + } + } + } + } + + public void run() + { + String ksname = baseCfs.metadata.ksName, viewName = view.name; + + if (SystemKeyspace.isViewBuilt(ksname, viewName)) + return; + + Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName); + final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName); + Token lastToken; + Function<View, Iterable<SSTableReader>> function; + if (buildStatus == null) + { + baseCfs.forceBlockingFlush(); + function = View.select(SSTableSet.CANONICAL); + int generation = Integer.MIN_VALUE; + + try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs) + { + for (SSTableReader reader : temp) + { + generation = Math.max(reader.descriptor.generation, generation); + } + } + + SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation); + lastToken = null; + } + else + { + function = new Function<View, Iterable<SSTableReader>>() + { + @Nullable + public Iterable<SSTableReader> apply(View view) + { + Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view); + if (readers != null) + return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left); + return null; + } + }; + lastToken = buildStatus.right; + } + + prevToken = lastToken; + try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; + ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) + { + while (!isStopped && iter.hasNext()) + { + DecoratedKey key = iter.next(); + Token token = key.getToken(); + if (lastToken == null || lastToken.compareTo(token) < 0) + { + for (Range<Token> range : ranges) + { + if (range.contains(token)) + { + buildKey(key); + + if (prevToken == null || prevToken.compareTo(token) != 0) + { + SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken()); + prevToken = token; + } + } + } + lastToken = null; + } + } + + SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName); + + } + catch (Exception e) + { + final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view); + ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder), + 5, + TimeUnit.MINUTES); + logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e); + } + } + + public CompactionInfo getCompactionInfo() + { + long rangesLeft = 0, rangesTotal = 0; + Token lastToken = prevToken; + + // This approximation is not very accurate, but since we do not have a method which allows us to calculate the + // percentage of a range covered by a second range, this is the best approximation that we can calculate. + // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of + // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node + // has. + for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName())) + { + rangesLeft++; + rangesTotal++; + // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the + // end of the method. + if (lastToken == null || range.contains(lastToken)) + rangesLeft = 0; + } + return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId); + } + + public void stop() + { + isStopped = true; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java new file mode 100644 index 0000000..7f97728 --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.Lock; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Striped; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.MaterializedViewDefinition; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; + +/** + * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that + * table are created when this manager is initialized. + * + * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update + * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple + * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and + * to affect change on the view. + */ +public class MaterializedViewManager +{ + private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024); + + private final ConcurrentNavigableMap<String, MaterializedView> viewsByName; + + private final ColumnFamilyStore baseCfs; + + public MaterializedViewManager(ColumnFamilyStore baseCfs) + { + this.viewsByName = new ConcurrentSkipListMap<>(); + + this.baseCfs = baseCfs; + } + + public Iterable<MaterializedView> allViews() + { + return viewsByName.values(); + } + + public Iterable<ColumnFamilyStore> allViewsCfs() + { + List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>(); + for (MaterializedView view : allViews()) + viewColumnFamilies.add(view.getViewCfs()); + return viewColumnFamilies; + } + + public void init() + { + reload(); + } + + public void invalidate() + { + for (MaterializedView view : allViews()) + removeMaterializedView(view.name); + } + + public void reload() + { + Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>(); + for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews()) + { + newViewsByName.put(definition.viewName, definition); + } + + for (String viewName : viewsByName.keySet()) + { + if (!newViewsByName.containsKey(viewName)) + removeMaterializedView(viewName); + } + + for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet()) + { + if (!viewsByName.containsKey(entry.getKey())) + addMaterializedView(entry.getValue()); + } + + for (MaterializedView view : allViews()) + { + view.build(); + // We provide the new definition from the base metadata + view.updateDefinition(newViewsByName.get(view.name)); + } + } + + public void buildAllViews() + { + for (MaterializedView view : allViews()) + view.build(); + } + + public void removeMaterializedView(String name) + { + MaterializedView view = viewsByName.remove(name); + + if (view == null) + return; + + SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name); + } + + public void addMaterializedView(MaterializedViewDefinition definition) + { + MaterializedView view = new MaterializedView(definition, baseCfs); + + viewsByName.put(definition.viewName, view); + } + + /** + * Calculates and pushes updates to the views replicas. The replicas are determined by + * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}. + */ + public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException + { + // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the + // view node. + if (!StorageService.instance.isJoined()) return; + + List<Mutation> mutations = null; + for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet()) + { + Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false); + if (viewMutations != null && !viewMutations.isEmpty()) + { + if (mutations == null) + mutations = Lists.newLinkedList(); + mutations.addAll(viewMutations); + } + } + if (mutations != null) + { + StorageProxy.mutateMV(key, mutations); + } + } + + public boolean updateAffectsView(PartitionUpdate upd) + { + for (MaterializedView view : allViews()) + { + if (view.updateAffectsView(upd)) + return true; + } + return false; + } + + public static Lock acquireLockFor(ByteBuffer key) + { + Lock lock = LOCKS.get(key); + + if (lock.tryLock()) + return lock; + + return null; + } + + public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean ignoreRf1) + { + for (IMutation mutation : mutations) + { + for (PartitionUpdate cf : mutation.getPartitionUpdates()) + { + Keyspace keyspace = Keyspace.open(cf.metadata().ksName); + + if (ignoreRf1 && keyspace.getReplicationStrategy().getReplicationFactor() == 1) + continue; + + MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager; + if (viewManager.updateAffectsView(cf)) + return true; + } + } + + return false; + } + + + public void forceBlockingFlush() + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + viewCfs.forceBlockingFlush(); + } + + public void dumpMemtables() + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + viewCfs.dumpMemtable(); + } + + public void truncateBlocking(long truncatedAt) + { + for (ColumnFamilyStore viewCfs : allViewsCfs()) + { + ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt); + SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter); + } + } +}