Avoid filtering sstables based on generation when ViewBuilder restarts Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be96c284 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be96c284 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be96c284 Branch: refs/heads/cassandra-3.11 Commit: be96c2840b0a6b269e22bde246b84e8ef4aeef69 Parents: 449400b Author: T Jake Luciani <j...@apache.org> Authored: Mon Apr 3 13:56:04 2017 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Apr 4 12:46:42 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 4 +- src/java/org/apache/cassandra/db/view/View.java | 5 ++ .../apache/cassandra/db/view/ViewBuilder.java | 52 +++++++++-------- .../org/apache/cassandra/cql3/ViewTest.java | 59 ++++++++++++++++++++ 4 files changed, 95 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e63e266..1ca8733 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,7 @@ 3.11.0 * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820) * cdc column addition strikes again (CASSANDRA-13382) - * Fix static column indexes (CASSANDRA-13277) + * Fix static column indexes (CASSANDRA-13277) * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298) * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370) * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247) @@ -19,6 +19,8 @@ * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) * Address message coalescing regression (CASSANDRA-12676) Merged from 3.0: + * Fix view builder bug that can filter out data on restart (CASSANDRA-13405) + * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400) * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395) * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020) * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389) http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index 0b8de9e..e4c6e02 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -197,7 +197,11 @@ public class View public ReadQuery getReadQuery() { if (query == null) + { query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds()); + logger.trace("View query: {}", rawSelect); + } + return query; } @@ -205,6 +209,7 @@ public class View { if (this.builder != null) { + logger.debug("Stopping current view builder due to schema change"); this.builder.stop(); this.builder = null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 9550e1e..8e647ea 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -72,8 +73,12 @@ public class ViewBuilder extends CompactionInfo.Holder private void buildKey(DecoratedKey key) { ReadQuery selectQuery = view.getReadQuery(); + if (!selectQuery.selectsKey(key)) + { + logger.trace("Skipping {}, view query filters", key); return; + } int nowInSec = FBUtilities.nowInSeconds(); SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec); @@ -96,55 +101,46 @@ public class ViewBuilder extends CompactionInfo.Holder public void run() { + logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name); logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name); UUID localHostId = SystemKeyspace.getLocalHostId(); String ksname = baseCfs.metadata.ksName, viewName = view.name; if (SystemKeyspace.isViewBuilt(ksname, viewName)) { + logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name); if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName)) updateDistributed(ksname, viewName, localHostId); return; } Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName); + final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName); Token lastToken; Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function; if (buildStatus == null) { - baseCfs.forceBlockingFlush(); - function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); - int generation = Integer.MIN_VALUE; - - try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs) - { - for (SSTableReader reader : temp) - { - generation = Math.max(reader.descriptor.generation, generation); - } - } - - SystemKeyspace.beginViewBuild(ksname, viewName, generation); + logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name); lastToken = null; + + //We don't track the generation number anymore since if a rebuild is stopped and + //restarted the max generation filter may yield no sstables due to compactions. + //We only care about max generation *during* a build, not across builds. + //see CASSANDRA-13405 + SystemKeyspace.beginViewBuild(ksname, viewName, 0); } else { - function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>() - { - @Nullable - public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view) - { - Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view); - if (readers != null) - return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left); - return null; - } - }; lastToken = buildStatus.right; + logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name); } + baseCfs.forceBlockingFlush(); + function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); + prevToken = lastToken; + long keysBuilt = 0; try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) { @@ -160,6 +156,7 @@ public class ViewBuilder extends CompactionInfo.Holder if (range.contains(token)) { buildKey(key); + ++keysBuilt; if (prevToken == null || prevToken.compareTo(token) != 0) { @@ -168,15 +165,21 @@ public class ViewBuilder extends CompactionInfo.Holder } } } + lastToken = null; } } if (!isStopped) { + logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt); SystemKeyspace.finishViewBuildStatus(ksname, viewName); updateDistributed(ksname, viewName, localHostId); } + else + { + logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt); + } } catch (Exception e) { @@ -222,6 +225,7 @@ public class ViewBuilder extends CompactionInfo.Holder if (lastToken == null || range.contains(lastToken)) rangesLeft = 0; } + return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index ac065e6..6f6e04d 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -21,8 +21,11 @@ package org.apache.cassandra.cql3; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.Uninterruptibles; + import junit.framework.Assert; import org.junit.After; import org.junit.Before; @@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.FBUtilities; @@ -1234,4 +1238,59 @@ public class ViewTest extends CQLTester { } } + + @Test + public void testViewBuilderResume() throws Throwable + { + createTable("CREATE TABLE %s (" + + "k int, " + + "c int, " + + "val text, " + + "PRIMARY KEY(k,c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + CompactionManager.instance.setCoreCompactorThreads(1); + CompactionManager.instance.setMaximumCompactorThreads(1); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + for (int i = 0; i < 1024; i++) + execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i); + + cfs.forceBlockingFlush(); + + createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); + + cfs.enableAutoCompaction(); + List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs); + + //Force a second MV on the same base table, which will restart the first MV builder... + createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)"); + + + //Compact the base table + FBUtilities.waitOnFutures(futures); + + while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test")) + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L)); + } }