Avoid filtering sstables based on generation when ViewBuilder restarts Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f1ab4a4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f1ab4a4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f1ab4a4 Branch: refs/heads/cassandra-3.11 Commit: 2f1ab4a4248ac24c890e195cd5714ca54510c19a Parents: 828ca7c Author: T Jake Luciani <j...@apache.org> Authored: Mon Apr 3 13:56:04 2017 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Apr 4 12:45:52 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/view/View.java | 5 ++ .../apache/cassandra/db/view/ViewBuilder.java | 57 ++++++++++--------- .../org/apache/cassandra/cql3/ViewTest.java | 59 ++++++++++++++++++++ 4 files changed, 97 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c258203..d9a97ad 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.13 + * Fix view builder bug that can filter out data on restart (CASSANDRA-13405) * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400) * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395) * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index 845a6ab..e471349 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -208,7 +208,11 @@ public class View public ReadQuery getReadQuery() { if (query == null) + { query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds()); + logger.trace("View query: {}", rawSelect); + } + return query; } @@ -216,6 +220,7 @@ public class View { if (this.builder != null) { + logger.debug("Stopping current view builder due to schema change"); this.builder.stop(); this.builder = null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 37c0e7b..94314fd 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -74,8 +75,12 @@ public class ViewBuilder extends CompactionInfo.Holder { AtomicLong noBase = new AtomicLong(Long.MAX_VALUE); ReadQuery selectQuery = view.getReadQuery(); + if (!selectQuery.selectsKey(key)) + { + logger.trace("Skipping {}, view query filters", key); return; + } int nowInSec = FBUtilities.nowInSeconds(); SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); @@ -97,49 +102,41 @@ public class ViewBuilder extends CompactionInfo.Holder public void run() { + logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name); String ksname = baseCfs.metadata.ksName, viewName = view.name; if (SystemKeyspace.isViewBuilt(ksname, viewName)) + { + logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name); return; - + } Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName); + final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName); Token lastToken; Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function; if (buildStatus == null) { - baseCfs.forceBlockingFlush(); - function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); - int generation = Integer.MIN_VALUE; - - try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs) - { - for (SSTableReader reader : temp) - { - generation = Math.max(reader.descriptor.generation, generation); - } - } - - SystemKeyspace.beginViewBuild(ksname, viewName, generation); + logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name); lastToken = null; + + //We don't track the generation number anymore since if a rebuild is stopped and + //restarted the max generation filter may yield no sstables due to compactions. + //We only care about max generation *during* a build, not across builds. + //see CASSANDRA-13405 + SystemKeyspace.beginViewBuild(ksname, viewName, 0); } else { - function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>() - { - @Nullable - public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view) - { - Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view); - if (readers != null) - return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left); - return null; - } - }; lastToken = buildStatus.right; + logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name); } + baseCfs.forceBlockingFlush(); + function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); + prevToken = lastToken; + long keysBuilt = 0; try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) { @@ -154,6 +151,7 @@ public class ViewBuilder extends CompactionInfo.Holder if (range.contains(token)) { buildKey(key); + ++keysBuilt; if (prevToken == null || prevToken.compareTo(token) != 0) { @@ -162,12 +160,20 @@ public class ViewBuilder extends CompactionInfo.Holder } } } + lastToken = null; } } if (!isStopped) + { + logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt); SystemKeyspace.finishViewBuildStatus(ksname, viewName); + } + else + { + logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt); + } } catch (Exception e) { @@ -198,6 +204,7 @@ public class ViewBuilder extends CompactionInfo.Holder if (lastToken == null || range.contains(lastToken)) rangesLeft = 0; } + return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 2070bef..e595ebd 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -21,8 +21,11 @@ package org.apache.cassandra.cql3; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.Uninterruptibles; + import junit.framework.Assert; import org.junit.After; import org.junit.Before; @@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertTrue; @@ -1203,4 +1207,59 @@ public class ViewTest extends CQLTester assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1)); assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv"), row(1, 0)); } + + @Test + public void testViewBuilderResume() throws Throwable + { + createTable("CREATE TABLE %s (" + + "k int, " + + "c int, " + + "val text, " + + "PRIMARY KEY(k,c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + CompactionManager.instance.setCoreCompactorThreads(1); + CompactionManager.instance.setMaximumCompactorThreads(1); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); + + cfs.enableAutoCompaction(); + List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs); + + //Force a second MV on the same base table, which will restart the first MV builder... + createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); + + + //Compact the base table + FBUtilities.waitOnFutures(futures); + + while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test")) + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L)); + } }