Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be96c284
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be96c284
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be96c284

Branch: refs/heads/cassandra-3.11
Commit: be96c2840b0a6b269e22bde246b84e8ef4aeef69
Parents: 449400b
Author: T Jake Luciani <j...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <j...@apache.org>
Committed: Tue Apr 4 12:46:42 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 52 +++++++++--------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e63e266..1ca8733 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
 3.11.0
  * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
  * cdc column addition strikes again (CASSANDRA-13382)
- * Fix static column indexes (CASSANDRA-13277) 
+ * Fix static column indexes (CASSANDRA-13277)
  * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
  * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
  * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns 
(CASSANDRA-13247)
@@ -19,6 +19,8 @@
  * NoReplicationTokenAllocator should work with zero replication factor 
(CASSANDRA-12983)
  * Address message coalescing regression (CASSANDRA-12676)
 Merged from 3.0:
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
+ * Fix 2i page size calculation when there are no regular columns 
(CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data 
(CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled 
(CASSANDRA-13020)
  * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors 
(CASSANDRA-13389)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java 
b/src/java/org/apache/cassandra/db/view/View.java
index 0b8de9e..e4c6e02 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -197,7 +197,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = 
getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()),
 FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -205,6 +209,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 9550e1e..8e647ea 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -72,8 +73,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     private void buildKey(DecoratedKey key)
     {
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = 
view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -96,55 +101,46 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", 
baseCfs.metadata.ksName, view.name);
         logger.trace("Running view builder for {}.{}", 
baseCfs.metadata.ksName, view.name);
         UUID localHostId = SystemKeyspace.getLocalHostId();
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
         {
+            logger.debug("View already marked built for {}.{}", 
baseCfs.metadata.ksName, view.name);
             if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
                 updateDistributed(ksname, viewName, localHostId);
             return;
         }
 
         Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = 
SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, 
Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = 
org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = 
baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, 
generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", 
baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild 
is stopped and
+            //restarted the max generation filter may yield no sstables due to 
compactions.
+            //We only care about max generation *during* a build, not across 
builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, 
Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> 
apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = 
org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> 
ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base 
table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = 
org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = 
baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -160,6 +156,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || 
prevToken.compareTo(token) != 0)
                             {
@@ -168,15 +165,21 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
             {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", 
ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
                 updateDistributed(ksname, viewName, localHostId);
             }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} 
keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -222,6 +225,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, 
rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java 
b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index ac065e6..6f6e04d 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -1234,4 +1238,59 @@ public class ViewTest extends CQLTester
         {
         }
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM 
%%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY 
(val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = 
CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the 
first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, 
c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY 
KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }

Reply via email to