Split materialized view mutations on build to prevent OOM Patch by Carl Yeksigian; reviewed by Jake Luciani for CASSANDRA-12268
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76f17502 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76f17502 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76f17502 Branch: refs/heads/cassandra-3.X Commit: 76f175028544fe20f30ae873f23cba559097cef1 Parents: d5f2d0f Author: Carl Yeksigian <c...@apache.org> Authored: Wed Oct 12 12:24:19 2016 -0400 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Oct 12 12:24:19 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/view/TableViews.java | 91 +++++++++++++++++--- .../apache/cassandra/db/view/ViewBuilder.java | 14 +-- .../cassandra/db/view/ViewUpdateGenerator.java | 8 ++ 4 files changed, 95 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d797288..13800da 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.10 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268) * mx4j does not work in 3.0.8 (CASSANDRA-12274) * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740) * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java index 7feb67c..1a3cbb1 100644 --- a/src/java/org/apache/cassandra/db/view/TableViews.java +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@ -46,7 +46,8 @@ public class TableViews extends AbstractCollection<View> private final CFMetaData baseTableMetadata; // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace) - // massively exceeds the number of time it's read (for every mutation on the keyspace), so a copy-on-write list is the best option. + // is massively exceeded by the number of times it's read (for every mutation on the keyspace), so a copy-on-write + // list is the best option. private final List<View> views = new CopyOnWriteArrayList(); public TableViews(CFMetaData baseTableMetadata) @@ -137,7 +138,7 @@ public class TableViews extends AbstractCollection<View> UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command); UnfilteredRowIterator updates = update.unfilteredIterator()) { - mutations = generateViewUpdates(views, updates, existings, nowInSec); + mutations = Iterators.getOnlyElement(generateViewUpdates(views, updates, existings, nowInSec, false)); } Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); @@ -145,6 +146,7 @@ public class TableViews extends AbstractCollection<View> StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete); } + /** * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the * mutation to be applied to the provided views. @@ -159,7 +161,11 @@ public class TableViews extends AbstractCollection<View> * @param nowInSec the current time in seconds. * @return the mutations to apply to the {@code views}. This can be empty. */ - public Collection<Mutation> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec) + public Iterator<Collection<Mutation>> generateViewUpdates(Collection<View> views, + UnfilteredRowIterator updates, + UnfilteredRowIterator existings, + int nowInSec, + boolean separateUpdates) { assert updates.metadata().cfId.equals(baseTableMetadata.cfId); @@ -251,18 +257,75 @@ public class TableViews extends AbstractCollection<View> addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec); } } - while (updatesIter.hasNext()) + + if (separateUpdates) { - Unfiltered update = updatesIter.next(); - // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates - if (update.isRangeTombstoneMarker()) - continue; + final Collection<Mutation> firstBuild = buildMutations(baseTableMetadata, generators); + + return new Iterator<Collection<Mutation>>() + { + // If the previous values are already empty, this update must be either empty or exclusively appending. + // In the case we are exclusively appending, we need to drop the build that was passed in and try to build a + // new first update instead. + // If there are no other updates, next will be null and the iterator will be empty. + Collection<Mutation> next = firstBuild.isEmpty() + ? buildNext() + : firstBuild; + + private Collection<Mutation> buildNext() + { + while (updatesIter.hasNext()) + { + Unfiltered update = updatesIter.next(); + // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates + if (update.isRangeTombstoneMarker()) + continue; + + Row updateRow = (Row) update; + addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec); + + // If the updates have been filtered, then we won't have any mutations; we need to make sure that we + // only return if the mutations are empty. Otherwise, we continue to search for an update which is + // not filtered + Collection<Mutation> mutations = buildMutations(baseTableMetadata, generators); + if (!mutations.isEmpty()) + return mutations; + } + + return null; + } + + public boolean hasNext() + { + return next != null; + } + + public Collection<Mutation> next() + { + Collection<Mutation> mutations = next; + + next = buildNext(); - Row updateRow = (Row)update; - addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec); + assert !mutations.isEmpty() : "Expected mutations to be non-empty"; + return mutations; + } + }; } + else + { + while (updatesIter.hasNext()) + { + Unfiltered update = updatesIter.next(); + // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates + if (update.isRangeTombstoneMarker()) + continue; + + Row updateRow = (Row) update; + addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec); + } - return buildMutations(baseTableMetadata, generators); + return Iterators.singletonIterator(buildMutations(baseTableMetadata, generators)); + } } /** @@ -425,10 +488,13 @@ public class TableViews extends AbstractCollection<View> // One view is probably common enough and we can optimize a bit easily if (generators.size() == 1) { - Collection<PartitionUpdate> updates = generators.get(0).generateViewUpdates(); + ViewUpdateGenerator generator = generators.get(0); + Collection<PartitionUpdate> updates = generator.generateViewUpdates(); List<Mutation> mutations = new ArrayList<>(updates.size()); for (PartitionUpdate update : updates) mutations.add(new Mutation(update)); + + generator.clear(); return mutations; } @@ -446,6 +512,7 @@ public class TableViews extends AbstractCollection<View> } mutation.add(update); } + generator.clear(); } return mutations.values(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index b55eda0..37c0e7b 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.view; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -83,15 +84,15 @@ public class ViewBuilder extends CompactionInfo.Holder // and pretend that there is nothing pre-existing. UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false); - Collection<Mutation> mutations; try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command)) { - mutations = baseCfs.keyspace.viewManager.forTable(baseCfs.metadata).generateViewUpdates(Collections.singleton(view), data, empty, nowInSec); - } + Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager + .forTable(baseCfs.metadata) + .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true); - if (!mutations.isEmpty()) - StorageProxy.mutateMV(key.getKey(), mutations, true, noBase); + mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase)); + } } public void run() @@ -166,8 +167,7 @@ public class ViewBuilder extends CompactionInfo.Holder } if (!isStopped) - SystemKeyspace.finishViewBuildStatus(ksname, viewName); - + SystemKeyspace.finishViewBuildStatus(ksname, viewName); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 3bdc380..edb88d0 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -145,6 +145,14 @@ public class ViewUpdateGenerator } /** + * Clears the current state so that the generator may be reused. + */ + public void clear() + { + updates.clear(); + } + + /** * Compute which type of action needs to be performed to the view for a base table row * before and after an update. */