Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 828ca7cc9 -> 2f1ab4a42
  refs/heads/cassandra-3.11 e8053dd8b -> be96c2840
  refs/heads/trunk 633babf0f -> 522ddba27


Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f1ab4a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f1ab4a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f1ab4a4

Branch: refs/heads/cassandra-3.0
Commit: 2f1ab4a4248ac24c890e195cd5714ca54510c19a
Parents: 828ca7c
Author: T Jake Luciani <j...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <j...@apache.org>
Committed: Tue Apr 4 12:45:52 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 57 ++++++++++---------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 97 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c258203..d9a97ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.13
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
  * Fix 2i page size calculation when there are no regular columns 
(CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data 
(CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled 
(CASSANDRA-13020)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java 
b/src/java/org/apache/cassandra/db/view/View.java
index 845a6ab..e471349 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -208,7 +208,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = 
getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()),
 FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -216,6 +220,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 37c0e7b..94314fd 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -74,8 +75,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     {
         AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = 
view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -97,49 +102,41 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", 
baseCfs.metadata.ksName, view.name);
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
+        {
+            logger.debug("View already marked built for {}.{}", 
baseCfs.metadata.ksName, view.name);
             return;
-
+        }
         Iterable<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = 
SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, 
Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = 
org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = 
baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, 
generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", 
baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild 
is stopped and
+            //restarted the max generation filter may yield no sstables due to 
compactions.
+            //We only care about max generation *during* a build, not across 
builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, 
Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> 
apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = 
org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> 
ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base 
table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = 
org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = 
baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -154,6 +151,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || 
prevToken.compareTo(token) != 0)
                             {
@@ -162,12 +160,20 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
+            {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", 
ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+            }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} 
keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -198,6 +204,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, 
rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java 
b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 2070bef..e595ebd 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertTrue;
@@ -1203,4 +1207,59 @@ public class ViewTest extends CQLTester
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM %s"), row(0, 1));
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * 
FROM mv"), row(1, 0));
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM 
%%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY 
(val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = 
CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the 
first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, 
c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY 
KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }

Reply via email to