Repository: cassandra
Updated Branches:
  refs/heads/trunk f9a1a80af -> 48815d4a1


fix SASI memtable switching of flush

patch by xedin; reviewed by beobal for CASSANDRA-11159


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48815d4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48815d4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48815d4a

Branch: refs/heads/trunk
Commit: 48815d4a182915e852888cb35273b8e896cea440
Parents: f9a1a80
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Thu Feb 11 18:54:04 2016 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Feb 16 13:06:39 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/lifecycle/Tracker.java  | 26 ++++++-
 .../apache/cassandra/index/sasi/SASIIndex.java  |  8 ++
 .../cassandra/index/sasi/conf/ColumnIndex.java  | 40 +++++++++-
 .../cassandra/index/sasi/conf/view/View.java    |  4 +-
 .../index/sasi/plan/QueryController.java        | 30 +++-----
 .../MemtableDiscardedNotification.java          | 30 ++++++++
 .../MemtableSwitchedNotification.java           | 30 ++++++++
 .../cassandra/index/sasi/SASIIndexTest.java     | 77 ++++++++++++++++++++
 9 files changed, 220 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c3bfdc3..f20e983 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * fix SASI memtable switching on flush (CASSANDRA-11159)
  * Remove duplicate offline compaction tracking (CASSANDRA-11148)
  * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130)
  * Support long name output for nodetool commands (CASSANDRA-7950)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 4c73472..dd07b19 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -318,6 +318,8 @@ public class Tracker
         Pair<View, View> result = apply(View.switchMemtable(newMemtable));
         if (truncating)
             notifyRenewed(newMemtable);
+        else
+            notifySwitched(result.left.getCurrentMemtable());
 
         return result.left.getCurrentMemtable();
     }
@@ -349,6 +351,8 @@ public class Tracker
         // TODO: if we're invalidated, should we notifyadded AND removed, or 
just skip both?
         fail = notifyAdded(sstables, fail);
 
+        notifyDiscarded(memtable);
+
         if (!isDummy() && !cfstore.isValid())
             dropSSTables();
 
@@ -441,16 +445,30 @@ public class Tracker
             subscriber.handleNotification(notification, this);
     }
 
-    public void notifyRenewed(Memtable renewed)
+    public void notifyTruncated(long truncatedAt)
     {
-        INotification notification = new MemtableRenewedNotification(renewed);
+        INotification notification = new TruncationNotification(truncatedAt);
         for (INotificationConsumer subscriber : subscribers)
             subscriber.handleNotification(notification, this);
     }
 
-    public void notifyTruncated(long truncatedAt)
+    public void notifyRenewed(Memtable renewed)
+    {
+        notify(new MemtableRenewedNotification(renewed));
+    }
+
+    public void notifySwitched(Memtable previous)
+    {
+        notify(new MemtableSwitchedNotification(previous));
+    }
+
+    public void notifyDiscarded(Memtable discarded)
+    {
+        notify(new MemtableDiscardedNotification(discarded));
+    }
+
+    private void notify(INotification notification)
     {
-        INotification notification = new TruncationNotification(truncatedAt);
         for (INotificationConsumer subscriber : subscribers)
             subscriber.handleNotification(notification, this);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index d480b82..90cc72e 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -311,6 +311,14 @@ public class SASIIndex implements Index, 
INotificationConsumer
         {
             index.switchMemtable();
         }
+        else if (notification instanceof MemtableSwitchedNotification)
+        {
+            index.switchMemtable(((MemtableSwitchedNotification) 
notification).memtable);
+        }
+        else if (notification instanceof MemtableDiscardedNotification)
+        {
+            index.discardMemtable(((MemtableDiscardedNotification) 
notification).memtable);
+        }
     }
 
     public ColumnIndex getIndex()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java 
b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 1703bd4..76ab968 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -22,11 +22,16 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -39,6 +44,7 @@ import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
@@ -54,6 +60,8 @@ public class ColumnIndex
     private final Optional<IndexMetadata> config;
 
     private final AtomicReference<IndexMemtable> memtable;
+    private final ConcurrentMap<Memtable, IndexMemtable> pendingFlush = new 
ConcurrentHashMap<>();
+
     private final IndexMode mode;
 
     private final Component component;
@@ -92,17 +100,45 @@ public class ColumnIndex
 
     public long index(DecoratedKey key, Row row)
     {
-        return memtable.get().index(key, getValueOf(column, row, 
FBUtilities.nowInSeconds()));
+        return getCurrentMemtable().index(key, getValueOf(column, row, 
FBUtilities.nowInSeconds()));
     }
 
     public void switchMemtable()
     {
+        // discard current memtable with all of it's data, useful on truncate
         memtable.set(new IndexMemtable(this));
     }
 
+    public void switchMemtable(Memtable parent)
+    {
+        pendingFlush.putIfAbsent(parent, memtable.getAndSet(new 
IndexMemtable(this)));
+    }
+
+    public void discardMemtable(Memtable parent)
+    {
+        pendingFlush.remove(parent);
+    }
+
+    @VisibleForTesting
+    public IndexMemtable getCurrentMemtable()
+    {
+        return memtable.get();
+    }
+
+    @VisibleForTesting
+    public Collection<IndexMemtable> getPendingMemtables()
+    {
+        return pendingFlush.values();
+    }
+
     public RangeIterator<Long, Token> searchMemtable(Expression e)
     {
-        return memtable.get().search(e);
+        RangeIterator.Builder<Long, Token> builder = new 
RangeUnionIterator.Builder<>();
+        builder.add(getCurrentMemtable().search(e));
+        for (IndexMemtable memtable : getPendingMemtables())
+            builder.add(memtable.search(e));
+
+        return builder.build();
     }
 
     public void update(Collection<SSTableReader> oldSSTables, 
Collection<SSTableReader> newSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java 
b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
index 505a4d7..1f68b0c 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
@@ -87,9 +87,9 @@ public class View implements Iterable<SSTableIndex>
             throw new IllegalStateException(String.format("mismatched sizes 
for intervals tree for keys vs terms: %d != %d", 
keyIntervalTree.intervalCount(), termTree.intervalCount()));
     }
 
-    public Set<SSTableIndex> match(final Set<SSTableReader> scope, Expression 
expression)
+    public Set<SSTableIndex> match(Expression expression)
     {
-        return Sets.filter(termTree.search(expression), index -> 
scope.contains(index.getSSTable()));
+        return termTree.search(expression);
     }
 
     public List<SSTableIndex> match(ByteBuffer minKey, ByteBuffer maxKey)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java 
b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index 8e10fd0..70de463 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -20,12 +20,12 @@ package org.apache.cassandra.index.sasi.plan;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.sasi.SASIIndex;
@@ -51,18 +51,16 @@ public class QueryController
 
     private final ColumnFamilyStore cfs;
     private final PartitionRangeReadCommand command;
+    private final DataRange range;
     private final Map<Collection<Expression>, List<RangeIterator<Long, 
Token>>> resources = new HashMap<>();
-    private final RefViewFragment scope;
-    private final Set<SSTableReader> sstables;
 
     public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand 
command, long timeQuotaMs)
     {
         this.cfs = cfs;
         this.command = command;
+        this.range = command.dataRange();
         this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs);
         this.executionStart = System.nanoTime();
-        this.scope = getSSTableScope(cfs, command);
-        this.sstables = new HashSet<>(scope.sstables);
     }
 
     public boolean isForThrift()
@@ -178,14 +176,7 @@ public class QueryController
 
     public void finish()
     {
-        try
-        {
-            resources.values().forEach(this::releaseIndexes);
-        }
-        finally
-        {
-            scope.release();
-        }
+        resources.values().forEach(this::releaseIndexes);
     }
 
     private Map<Expression, Set<SSTableIndex>> getView(OperationType op, 
Collection<Expression> expressions)
@@ -220,7 +211,7 @@ public class QueryController
             }
             else
             {
-                readers.addAll(view.match(sstables, e));
+                readers.addAll(applyScope(view.match(e)));
             }
 
             indexes.put(e, readers);
@@ -243,7 +234,7 @@ public class QueryController
             if (view == null)
                 continue;
 
-            Set<SSTableIndex> indexes = view.match(sstables, e);
+            Set<SSTableIndex> indexes = applyScope(view.match(e));
             if (primaryIndexes.size() > indexes.size())
             {
                 primaryIndexes = indexes;
@@ -254,8 +245,11 @@ public class QueryController
         return expression == null ? null : Pair.create(expression, 
primaryIndexes);
     }
 
-    private static RefViewFragment getSSTableScope(ColumnFamilyStore cfs, 
PartitionRangeReadCommand command)
+    private Set<SSTableIndex> applyScope(Set<SSTableIndex> indexes)
     {
-        return 
cfs.selectAndReference(org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL,
 command.dataRange().keyRange()));
+        return Sets.filter(indexes, index -> {
+            SSTableReader sstable = index.getSSTable();
+            return range.startKey().compareTo(sstable.last) <= 0 && 
(range.stopKey().isMinimum() || sstable.first.compareTo(range.stopKey()) <= 0);
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java
 
b/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java
new file mode 100644
index 0000000..778cad0
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/notifications/MemtableDiscardedNotification.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.db.Memtable;
+
+public class MemtableDiscardedNotification implements INotification
+{
+    public final Memtable memtable;
+
+    public MemtableDiscardedNotification(Memtable discarded)
+    {
+        this.memtable = discarded;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java 
b/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java
new file mode 100644
index 0000000..946de4e
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/notifications/MemtableSwitchedNotification.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.db.Memtable;
+
+public class MemtableSwitchedNotification implements INotification
+{
+    public final Memtable memtable;
+
+    public MemtableSwitchedNotification(Memtable switched)
+    {
+        this.memtable = switched;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48815d4a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java 
b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index a88e594..c9d66f7 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -50,6 +50,8 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException;
+import org.apache.cassandra.index.sasi.memory.IndexMemtable;
+import org.apache.cassandra.index.sasi.plan.QueryController;
 import org.apache.cassandra.index.sasi.plan.QueryPlan;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -1938,6 +1940,81 @@ public class SASIIndexTest
             QueryProcessor.executeOnceInternal(String.format("TRUNCATE TABLE 
%s.%s", KS_NAME, table));
     }
 
+    @Test
+    public void testIndexMemtableSwitching()
+    {
+        // write some data but don't flush
+        ColumnFamilyStore store = loadData(new HashMap<String, Pair<String, 
Integer>>()
+        {{
+            put("key1", Pair.create("Pavel", 14));
+        }}, false);
+
+        ColumnIndex index = ((SASIIndex) 
store.indexManager.getIndexByName("first_name")).getIndex();
+        IndexMemtable beforeFlushMemtable = index.getCurrentMemtable();
+
+        PartitionRangeReadCommand command = new 
PartitionRangeReadCommand(store.metadata,
+                                                                          
FBUtilities.nowInSeconds(),
+                                                                          
ColumnFilter.all(store.metadata),
+                                                                          
RowFilter.NONE,
+                                                                          
DataLimits.NONE,
+                                                                          
DataRange.allData(store.getPartitioner()),
+                                                                          
Optional.empty());
+
+        QueryController controller = new QueryController(store, command, 
Integer.MAX_VALUE);
+        org.apache.cassandra.index.sasi.plan.Expression expression =
+                new 
org.apache.cassandra.index.sasi.plan.Expression(controller, index)
+                                                    
.add(Operator.LIKE_MATCHES, UTF8Type.instance.fromString("Pavel"));
+
+        Assert.assertTrue(beforeFlushMemtable.search(expression).getCount() > 
0);
+
+        store.forceBlockingFlush();
+
+        IndexMemtable afterFlushMemtable = index.getCurrentMemtable();
+
+        Assert.assertNotSame(afterFlushMemtable, beforeFlushMemtable);
+        Assert.assertNull(afterFlushMemtable.search(expression));
+        Assert.assertEquals(0, index.getPendingMemtables().size());
+
+        loadData(new HashMap<String, Pair<String, Integer>>()
+        {{
+            put("key2", Pair.create("Sam", 15));
+        }}, false);
+
+        expression = new 
org.apache.cassandra.index.sasi.plan.Expression(controller, index)
+                        .add(Operator.LIKE_MATCHES, 
UTF8Type.instance.fromString("Sam"));
+
+        beforeFlushMemtable = index.getCurrentMemtable();
+        Assert.assertTrue(beforeFlushMemtable.search(expression).getCount() > 
0);
+
+        // let's emulate switching memtable and see if we can still read-data 
in "pending"
+        
index.switchMemtable(store.getTracker().getView().getCurrentMemtable());
+
+        Assert.assertNotSame(index.getCurrentMemtable(), beforeFlushMemtable);
+        Assert.assertEquals(1, index.getPendingMemtables().size());
+
+        Assert.assertTrue(index.searchMemtable(expression).getCount() > 0);
+
+        // emulate "everything is flushed" notification
+        
index.discardMemtable(store.getTracker().getView().getCurrentMemtable());
+
+        Assert.assertEquals(0, index.getPendingMemtables().size());
+        Assert.assertNull(index.searchMemtable(expression));
+
+        // test discarding data from memtable
+        loadData(new HashMap<String, Pair<String, Integer>>()
+        {{
+            put("key3", Pair.create("Jonathan", 16));
+        }}, false);
+
+        expression = new 
org.apache.cassandra.index.sasi.plan.Expression(controller, index)
+                .add(Operator.LIKE_MATCHES, 
UTF8Type.instance.fromString("Jonathan"));
+
+        Assert.assertTrue(index.searchMemtable(expression).getCount() > 0);
+
+        index.switchMemtable();
+        Assert.assertNull(index.searchMemtable(expression));
+    }
+
     private static ColumnFamilyStore loadData(Map<String, Pair<String, 
Integer>> data, boolean forceFlush)
     {
         return loadData(data, System.currentTimeMillis(), forceFlush);

Reply via email to