Repository: cassandra
Updated Branches:
  refs/heads/trunk 974d8fc09 -> 8ef71f3f2


Throttle base partitions during MV repair streaming to prevent OOM

Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-13299


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8ef71f3f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8ef71f3f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8ef71f3f

Branch: refs/heads/trunk
Commit: 8ef71f3f29fb040cce18ba158ff5f289b388c30b
Parents: 974d8fc
Author: Zhao Yang <zhaoyangsingap...@gmail.com>
Authored: Fri Aug 11 13:04:28 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Sep 28 05:45:25 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/ThrottledUnfilteredIterator.java    | 251 ++++++++
 .../cassandra/streaming/StreamReceiveTask.java  |  36 +-
 .../cassandra/utils/AbstractIterator.java       |   7 +-
 .../rows/ThrottledUnfilteredIteratorTest.java   | 613 +++++++++++++++++++
 5 files changed, 890 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97e6b03..081ed72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Throttle base partitions during MV repair streaming to prevent OOM 
(CASSANDRA-13299)
  * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
  * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 
(CASSANDRA-13703)
  * Add extra information to SASI timeout exception (CASSANDRA-13677)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java 
b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
new file mode 100644
index 0000000..dd33b1e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A utility class to split the given {@link#UnfilteredRowIterator} into 
smaller chunks each
+ * having at most {@link #throttle} + 1 unfiltereds.
+ *
+ * Only the first output contains partition level info: {@link 
UnfilteredRowIterator#partitionLevelDeletion}
+ * and {@link UnfilteredRowIterator#staticRow}.
+ *
+ * Besides splitting, this iterator will also ensure each chunk does not 
finish with an open tombstone marker,
+ * by closing any opened tombstone markers and re-opening on the next chunk.
+ *
+ * The lifecycle of outputed {{@link UnfilteredRowIterator} only last till 
next call to {@link #next()}.
+ *
+ * A subsequent {@link #next} call will exhaust the previously returned 
iterator before computing the next,
+ * effectively skipping unfiltereds up to the throttle size.
+ *
+ * Closing this iterator will close the underlying iterator.
+ *
+ */
+public class ThrottledUnfilteredIterator extends 
AbstractIterator<UnfilteredRowIterator> implements 
CloseableIterator<UnfilteredRowIterator>
+{
+    private final UnfilteredRowIterator origin;
+    private final int throttle;
+
+    // internal mutable state
+    private UnfilteredRowIterator throttledItr;
+
+    // extra unfiltereds from previous iteration
+    private Iterator<Unfiltered> overflowed = Collections.emptyIterator();
+
+    @VisibleForTesting
+    ThrottledUnfilteredIterator(UnfilteredRowIterator origin, int throttle)
+    {
+        assert origin != null;
+        assert throttle > 1 : "Throttle size must be higher than 1 to properly 
support open and close tombstone boundaries.";
+        this.origin = origin;
+        this.throttle = throttle;
+        this.throttledItr = null;
+    }
+
+    @Override
+    protected UnfilteredRowIterator computeNext()
+    {
+        // exhaust previous throttled iterator
+        while (throttledItr != null && throttledItr.hasNext())
+            throttledItr.next();
+
+        if (!origin.hasNext())
+            return endOfData();
+
+        throttledItr = new WrappingUnfilteredRowIterator(origin)
+        {
+            private int count = 0;
+            private boolean isFirst = throttledItr == null;
+
+            // current batch's openMarker. if it's generated in previous batch,
+            // it must be consumed as first element of current batch
+            private RangeTombstoneMarker openMarker;
+
+            // current batch's closeMarker.
+            // it must be consumed as last element of current batch
+            private RangeTombstoneMarker closeMarker = null;
+
+            @Override
+            public boolean hasNext()
+            {
+                return (withinLimit() && wrapped.hasNext()) || closeMarker != 
null;
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                if (closeMarker != null)
+                {
+                    assert count == throttle;
+                    Unfiltered toReturn = closeMarker;
+                    closeMarker = null;
+                    return toReturn;
+                }
+
+                Unfiltered next;
+                assert withinLimit();
+                // in the beginning of the batch, there might be remaining 
unfiltereds from previous iteration
+                if (overflowed.hasNext())
+                    next = overflowed.next();
+                else
+                    next = wrapped.next();
+                recordNext(next);
+                return next;
+            }
+
+            private void recordNext(Unfiltered unfiltered)
+            {
+                count++;
+                if (unfiltered.isRangeTombstoneMarker())
+                    updateMarker((RangeTombstoneMarker) unfiltered);
+                // when reach throttle with a remaining openMarker, we need to 
create corresponding closeMarker.
+                if (count == throttle && openMarker != null)
+                {
+                    assert wrapped.hasNext();
+                    closeOpenMarker(wrapped.next());
+                }
+            }
+
+            private boolean withinLimit()
+            {
+                return count < throttle;
+            }
+
+            private void updateMarker(RangeTombstoneMarker marker)
+            {
+                openMarker = marker.isOpen(isReverseOrder()) ? marker : null;
+            }
+
+            /**
+             * There 3 cases for next, 1. if it's boundaryMarker, we split it 
as closeMarker for current batch, next
+             * openMarker for next batch 2. if it's boundMakrer, it must be 
closeMarker. 3. if it's Row, create
+             * corresponding closeMarker for current batch, and create next 
openMarker for next batch including current
+             * Row.
+             */
+            private void closeOpenMarker(Unfiltered next)
+            {
+                assert openMarker != null;
+
+                if (next.isRangeTombstoneMarker())
+                {
+                    RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
+                    // if it's boundary, create closeMarker for current batch 
and openMarker for next batch
+                    if (marker.isBoundary())
+                    {
+                        RangeTombstoneBoundaryMarker boundary = 
(RangeTombstoneBoundaryMarker) marker;
+                        closeMarker = 
boundary.createCorrespondingCloseMarker(isReverseOrder());
+                        overflowed = 
Collections.singleton((Unfiltered)boundary.createCorrespondingOpenMarker(isReverseOrder())).iterator();
+                    }
+                    else
+                    {
+                        // if it's bound, it must be closeMarker.
+                        assert marker.isClose(isReverseOrder());
+                        updateMarker(marker);
+                        closeMarker = marker;
+                    }
+                }
+                else
+                {
+                    // it's Row, need to create closeMarker for current batch 
and openMarker for next batch
+                    DeletionTime openDeletion = 
openMarker.openDeletionTime(isReverseOrder());
+                    ByteBuffer[] buffers = next.clustering().getRawValues();
+                    closeMarker = 
RangeTombstoneBoundMarker.exclusiveClose(isReverseOrder(), buffers, 
openDeletion);
+
+                    // for next batch
+                    overflowed = 
Arrays.asList(RangeTombstoneBoundMarker.inclusiveOpen(isReverseOrder(),
+                                                                               
        buffers,
+                                                                               
        openDeletion), next).iterator();
+                }
+            }
+
+            @Override
+            public DeletionTime partitionLevelDeletion()
+            {
+                return isFirst ? wrapped.partitionLevelDeletion() : 
DeletionTime.LIVE;
+            }
+
+            @Override
+            public Row staticRow()
+            {
+                return isFirst ? wrapped.staticRow() : Rows.EMPTY_STATIC_ROW;
+            }
+
+            @Override
+            public void close()
+            {
+                // no op
+            }
+        };
+        return throttledItr;
+    }
+
+    public void close()
+    {
+        if (origin != null)
+            origin.close();
+    }
+
+    /**
+     * Splits a {@link UnfilteredPartitionIterator} in {@link 
UnfilteredRowIterator} batches with size no higher
+     * than <b>maxBatchSize</b>
+     */
+    public static CloseableIterator<UnfilteredRowIterator> 
throttle(UnfilteredPartitionIterator partitionIterator, int maxBatchSize)
+    {
+        return new AbstractIterator<UnfilteredRowIterator>()
+        {
+            ThrottledUnfilteredIterator current = null;
+
+            protected UnfilteredRowIterator computeNext()
+            {
+                if (current != null && !current.hasNext())
+                {
+                    current.close();
+                    current = null;
+                }
+
+                if (current == null && partitionIterator.hasNext())
+                {
+                    current = new 
ThrottledUnfilteredIterator(partitionIterator.next(), maxBatchSize);
+                    assert current.hasNext() : "UnfilteredPartitionIterator 
should not contain empty partitions";
+                }
+
+                if (current != null && current.hasNext())
+                    return current.next();
+
+                return endOfData();
+            }
+
+            public void close()
+            {
+                if (current != null)
+                    current.close();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6aa70ad..988bc9e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -28,7 +28,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -37,6 +36,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.Bounds;
@@ -45,6 +45,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -58,6 +59,8 @@ public class StreamReceiveTask extends StreamTask
 
     private static final ExecutorService executor = 
Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
 
+    private static final int MAX_ROWS_PER_BATCH = 
Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100);
+
     // number of files to receive
     private final int totalFiles;
     // total size of files to receive
@@ -174,29 +177,28 @@ public class StreamReceiveTask extends StreamTask
             return cfs.metadata().params.cdc;
         }
 
-        Mutation createMutation(ColumnFamilyStore cfs, UnfilteredRowIterator 
rowIterator)
-        {
-            return new Mutation(PartitionUpdate.fromIterator(rowIterator, 
ColumnFilter.all(cfs.metadata())));
-        }
-
         private void sendThroughWritePath(ColumnFamilyStore cfs, 
Collection<SSTableReader> readers) {
             boolean hasCdc = hasCDC(cfs);
+            ColumnFilter filter = ColumnFilter.all(cfs.metadata());
             for (SSTableReader reader : readers)
             {
                 Keyspace ks = Keyspace.open(reader.getKeyspaceName());
-                try (ISSTableScanner scanner = reader.getScanner())
+                // When doing mutation-based repair we split each partition 
into smaller batches
+                // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and 
generating heap pressure
+                try (ISSTableScanner scanner = reader.getScanner();
+                     CloseableIterator<UnfilteredRowIterator> 
throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, 
MAX_ROWS_PER_BATCH))
                 {
-                    while (scanner.hasNext())
+                    while (throttledPartitions.hasNext())
                     {
-                        try (UnfilteredRowIterator rowIterator = 
scanner.next())
-                        {
-                            // MV *can* be applied unsafe if there's no CDC on 
the CFS as we flush
-                            // before transaction is done.
-                            //
-                            // If the CFS has CDC, however, these updates need 
to be written to the CommitLog
-                            // so they get archived into the cdc_raw folder
-                            ks.apply(createMutation(cfs, rowIterator), hasCdc, 
true, false);
-                        }
+                        // MV *can* be applied unsafe if there's no CDC on the 
CFS as we flush
+                        // before transaction is done.
+                        //
+                        // If the CFS has CDC, however, these updates need to 
be written to the CommitLog
+                        // so they get archived into the cdc_raw folder
+                        ks.apply(new 
Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)),
+                                 hasCdc,
+                                 true,
+                                 false);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AbstractIterator.java 
b/src/java/org/apache/cassandra/utils/AbstractIterator.java
index dd3d73c..7dd32b8 100644
--- a/src/java/org/apache/cassandra/utils/AbstractIterator.java
+++ b/src/java/org/apache/cassandra/utils/AbstractIterator.java
@@ -23,7 +23,7 @@ import java.util.NoSuchElementException;
 
 import com.google.common.collect.PeekingIterator;
 
-public abstract class AbstractIterator<V> implements Iterator<V>, 
PeekingIterator<V>
+public abstract class AbstractIterator<V> implements Iterator<V>, 
PeekingIterator<V>, CloseableIterator<V>
 {
 
     private static enum State { MUST_FETCH, HAS_NEXT, DONE, FAILED }
@@ -80,4 +80,9 @@ public abstract class AbstractIterator<V> implements 
Iterator<V>, PeekingIterato
     {
         throw new UnsupportedOperationException();
     }
+
+    public void close()
+    {
+        //no-op
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java 
b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
new file mode 100644
index 0000000..2d2cce0
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import static org.apache.cassandra.SchemaLoader.standardCFMD;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import com.google.common.collect.Iterators;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.AbstractReadCommandBuilder;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ThrottledUnfilteredIteratorTest extends CQLTester
+{
+    private static final String KSNAME = "ThrottledUnfilteredIteratorTest";
+    private static final String CFNAME = "StandardInteger1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KSNAME,
+                                    KeyspaceParams.simple(1),
+                                    standardCFMD(KSNAME, CFNAME, 1, 
UTF8Type.instance, Int32Type.instance, Int32Type.instance));
+    }
+
+    static final TableMetadata metadata;
+    static final ColumnMetadata v1Metadata;
+    static final ColumnMetadata v2Metadata;
+
+    static
+    {
+        metadata = TableMetadata.builder("", "")
+                                .addPartitionKeyColumn("pk", 
Int32Type.instance)
+                                .addClusteringColumn("ck1", Int32Type.instance)
+                                .addClusteringColumn("ck2", Int32Type.instance)
+                                .addRegularColumn("v1", Int32Type.instance)
+                                .addRegularColumn("v2", Int32Type.instance)
+                                .build();
+        v1Metadata = 
metadata.regularAndStaticColumns().columns(false).getSimple(0);
+        v2Metadata = 
metadata.regularAndStaticColumns().columns(false).getSimple(1);
+    }
+
+    @Test
+    public void complexThrottleWithTombstoneTest() throws Throwable
+    {
+        // create cell tombstone, range tombstone, partition deletion
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 
int, PRIMARY KEY (pk, ck1, ck2))");
+
+        for (int ck1 = 1; ck1 <= 150; ck1++)
+            for (int ck2 = 1; ck2 <= 150; ck2++)
+            {
+                int timestamp = ck1, v1 = ck1, v2 = ck2;
+                execute("INSERT INTO %s(pk,ck1,ck2,v1,v2) VALUES(1,?,?,?,?) 
using timestamp "
+                        + timestamp, ck1, ck2, v1, v2);
+            }
+
+        for (int ck1 = 1; ck1 <= 100; ck1++)
+            for (int ck2 = 1; ck2 <= 100; ck2++)
+            {
+                if (ck1 % 2 == 0 || ck1 % 3 == 0) // range tombstone
+                    execute("DELETE FROM %s USING TIMESTAMP 170 WHERE pk=1 AND 
ck1=?", ck1);
+                else if (ck1 == ck2) // row tombstone
+                    execute("DELETE FROM %s USING TIMESTAMP 180 WHERE pk=1 AND 
ck1=? AND ck2=?", ck1, ck2);
+                else if (ck1 == ck2 - 1) // cell tombstone
+                    execute("DELETE v2 FROM %s USING TIMESTAMP 190 WHERE pk=1 
AND ck1=? AND ck2=?", ck1, ck2);
+            }
+
+        // range deletion
+        execute("DELETE FROM %s USING TIMESTAMP 150 WHERE pk=1 AND ck1 > 100 
AND ck1 < 120");
+        execute("DELETE FROM %s USING TIMESTAMP 150 WHERE pk=1 AND ck1 = 50 
AND ck2 < 120");
+        // partition deletion
+        execute("DELETE FROM %s USING TIMESTAMP 160 WHERE pk=1");
+
+        // flush and generate 1 sstable
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        cfs.forceBlockingFlush();
+        cfs.disableAutoCompaction();
+        cfs.forceMajorCompaction();
+
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader reader = cfs.getLiveSSTables().iterator().next();
+
+        try (ISSTableScanner scanner = reader.getScanner())
+        {
+            try (UnfilteredRowIterator rowIterator = scanner.next())
+            {
+                // only 1 partition data
+                assertFalse(scanner.hasNext());
+                List<Unfiltered> expectedUnfiltereds = new ArrayList<>();
+                rowIterator.forEachRemaining(expectedUnfiltereds::add);
+
+                // test different throttle
+                for (Integer throttle : Arrays.asList(2, 3, 4, 5, 11, 41, 99, 
1000, 10001))
+                {
+                    try (ISSTableScanner scannerForThrottle = 
reader.getScanner())
+                    {
+                        assertTrue(scannerForThrottle.hasNext());
+                        try (UnfilteredRowIterator rowIteratorForThrottle = 
scannerForThrottle.next())
+                        {
+                            assertFalse(scannerForThrottle.hasNext());
+                            verifyThrottleIterator(expectedUnfiltereds,
+                                                   rowIteratorForThrottle,
+                                                   new 
ThrottledUnfilteredIterator(rowIteratorForThrottle, throttle),
+                                                   throttle);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void verifyThrottleIterator(List<Unfiltered> expectedUnfiltereds,
+                                        UnfilteredRowIterator 
rowIteratorForThrottle,
+                                        ThrottledUnfilteredIterator 
throttledIterator,
+                                        int throttle)
+    {
+        List<Unfiltered> output = new ArrayList<>();
+
+        boolean isRevered = rowIteratorForThrottle.isReverseOrder();
+        boolean isFirst = true;
+
+        while (throttledIterator.hasNext())
+        {
+            UnfilteredRowIterator splittedIterator = throttledIterator.next();
+            assertMetadata(rowIteratorForThrottle, splittedIterator, isFirst);
+
+            List<Unfiltered> splittedUnfiltereds = new ArrayList<>();
+
+            splittedIterator.forEachRemaining(splittedUnfiltereds::add);
+
+            int remain = expectedUnfiltereds.size() - output.size();
+            int expectedSize = remain >= throttle ? throttle : remain;
+            if (splittedUnfiltereds.size() != expectedSize)
+            {
+                assertEquals(expectedSize + 1, splittedUnfiltereds.size());
+                // the extra unfilter must be close bound marker
+                Unfiltered last = splittedUnfiltereds.get(expectedSize);
+                assertTrue(last.isRangeTombstoneMarker());
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) last;
+                assertFalse(marker.isBoundary());
+                assertTrue(marker.isClose(isRevered));
+            }
+            output.addAll(splittedUnfiltereds);
+            if (isFirst)
+                isFirst = false;
+        }
+        int index = 0;
+        RangeTombstoneMarker openMarker = null;
+        for (int i = 0; i < expectedUnfiltereds.size(); i++)
+        {
+            Unfiltered expected = expectedUnfiltereds.get(i);
+            Unfiltered data = output.get(i);
+
+            // verify that all tombstone are paired
+            if (data.isRangeTombstoneMarker())
+            {
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) data;
+                if (marker.isClose(isRevered))
+                {
+                    assertNotNull(openMarker);
+                    openMarker = null;
+                }
+                if (marker.isOpen(isRevered))
+                {
+                    assertNull(openMarker);
+                    openMarker = marker;
+                }
+            }
+            if (expected.equals(data))
+            {
+                index++;
+            }
+            else // because of created closeMarker and openMarker
+            {
+                assertNotNull(openMarker);
+                DeletionTime openDeletionTime = 
openMarker.openDeletionTime(isRevered);
+                // only boundary or row will create extra closeMarker and 
openMarker
+                if (expected.isRangeTombstoneMarker())
+                {
+                    RangeTombstoneMarker marker = (RangeTombstoneMarker) 
expected;
+                    assertTrue(marker.isBoundary());
+                    RangeTombstoneBoundaryMarker boundary = 
(RangeTombstoneBoundaryMarker) marker;
+                    
assertEquals(boundary.createCorrespondingCloseMarker(isRevered), data);
+                    
assertEquals(boundary.createCorrespondingOpenMarker(isRevered), 
output.get(index + 1));
+                    assertEquals(openDeletionTime, boundary.endDeletionTime());
+
+                    openMarker = 
boundary.createCorrespondingOpenMarker(isRevered);
+                }
+                else
+                {
+                    ByteBuffer[] byteBuffers = 
expected.clustering().getRawValues();
+                    RangeTombstoneBoundMarker closeMarker = 
RangeTombstoneBoundMarker.exclusiveClose(isRevered,
+                                                                               
                      byteBuffers,
+                                                                               
                      openDeletionTime);
+
+                    RangeTombstoneBoundMarker nextOpenMarker = 
RangeTombstoneBoundMarker.inclusiveOpen(isRevered,
+                                                                               
                        byteBuffers,
+                                                                               
                        openDeletionTime);
+                    assertEquals(closeMarker, data);
+                    assertEquals(nextOpenMarker, output.get(index + 1));
+
+                    openMarker = nextOpenMarker;
+                }
+                index += 2;
+            }
+        }
+        assertNull(openMarker);
+        assertEquals(output.size(), index);
+    }
+
+    @Test
+    public void simpleThrottleTest()
+    {
+        simpleThrottleTest(false);
+    }
+
+    @Test
+    public void skipTest()
+    {
+        simpleThrottleTest(true);
+    }
+
+    public void simpleThrottleTest(boolean skipOdd)
+    {
+        // all live rows with partition deletion
+        ThrottledUnfilteredIterator throttledIterator;
+        UnfilteredRowIterator origin;
+
+        List<Row> rows = new ArrayList<>();
+        int rowCount = 1111;
+
+        for (int i = 0; i < rowCount; i++)
+            rows.add(createRow(i, createCell(v1Metadata, i), 
createCell(v2Metadata, i)));
+
+        // testing different throttle limit
+        for (int throttle = 2; throttle < 1200; throttle += 21)
+        {
+            origin = rows(metadata.regularAndStaticColumns(),
+                          1,
+                          new DeletionTime(0, 100),
+                          Rows.EMPTY_STATIC_ROW,
+                          rows.toArray(new Row[0]));
+            throttledIterator = new ThrottledUnfilteredIterator(origin, 
throttle);
+
+            int splittedCount = (int) Math.ceil(rowCount*1.0/throttle);
+            for (int i = 1; i <= splittedCount; i++)
+            {
+                UnfilteredRowIterator splitted = throttledIterator.next();
+                assertMetadata(origin, splitted, i == 1);
+                // no op
+                splitted.close();
+
+                int start = (i - 1) * throttle;
+                int end = i == splittedCount ? rowCount : i * throttle;
+                if (skipOdd && (i % 2) == 0)
+                {
+                    assertRows(splitted, rows.subList(start, end).toArray(new 
Row[0]));
+                }
+            }
+            assertTrue(!throttledIterator.hasNext());
+        }
+    }
+
+    @Test
+    public void throttledPartitionIteratorTest()
+    {
+        // all live rows with partition deletion
+        CloseableIterator<UnfilteredRowIterator> throttledIterator;
+        UnfilteredPartitionIterator origin;
+
+        SortedMap<Integer, List<Row>> partitions = new TreeMap<>();
+        int partitionCount = 13;
+        int baseRowsPerPartition = 1111;
+
+        for (int i = 1; i <= partitionCount; i++)
+        {
+            ArrayList<Row> rows = new ArrayList<>();
+            for (int j = 0; j < (baseRowsPerPartition + i); j++)
+                rows.add(createRow(i, createCell(v1Metadata, j), 
createCell(v2Metadata, j)));
+            partitions.put(i, rows);
+        }
+
+        // testing different throttle limit
+        for (int throttle = 2; throttle < 1200; throttle += 21)
+        {
+            origin = partitions(metadata.regularAndStaticColumns(),
+                                new DeletionTime(0, 100),
+                                Rows.EMPTY_STATIC_ROW,
+                                partitions);
+            throttledIterator = ThrottledUnfilteredIterator.throttle(origin, 
throttle);
+
+            int currentPartition = 0;
+            int rowsInPartition = 0;
+            int expectedSplitCount = 0;
+            int currentSplit = 1;
+            while (throttledIterator.hasNext())
+            {
+                UnfilteredRowIterator splitted = throttledIterator.next();
+                if (currentSplit > expectedSplitCount)
+                {
+                    currentPartition++;
+                    rowsInPartition = partitions.get(currentPartition).size();
+                    expectedSplitCount = (int) Math.ceil(rowsInPartition * 1.0 
/ throttle);
+                    currentSplit = 1;
+                }
+                UnfilteredRowIterator current = 
rows(metadata.regularAndStaticColumns(),
+                                                     currentPartition,
+                                                     new DeletionTime(0, 100),
+                                                     Rows.EMPTY_STATIC_ROW,
+                                                     
partitions.get(currentPartition).toArray(new Row[0]));
+                assertMetadata(current, splitted, currentSplit == 1);
+                // no op
+                splitted.close();
+
+                int start = (currentSplit - 1) * throttle;
+                int end = currentSplit == expectedSplitCount ? rowsInPartition 
: currentSplit * throttle;
+                assertRows(splitted, 
partitions.get(currentPartition).subList(start, end).toArray(new Row[0]));
+                currentSplit++;
+            }
+        }
+
+
+        origin = partitions(metadata.regularAndStaticColumns(),
+                            new DeletionTime(0, 100),
+                            Rows.EMPTY_STATIC_ROW,
+                            partitions);
+        try
+        {
+            try (CloseableIterator<UnfilteredRowIterator> throttled = 
ThrottledUnfilteredIterator.throttle(origin, 10))
+            {
+                int i = 0;
+                while (throttled.hasNext())
+                {
+                    assertEquals(dk(1), throttled.next().partitionKey());
+                    if (i++ == 10)
+                    {
+                        throw new RuntimeException("Dummy exception");
+                    }
+                }
+                fail("Should not reach here");
+            }
+        }
+        catch (RuntimeException rte)
+        {
+            int iteratedPartitions = 2;
+            while (iteratedPartitions <= partitionCount)
+            {
+                // check that original iterator was not closed
+                assertTrue(origin.hasNext());
+                // check it's possible to fetch second partition from original 
iterator
+                assertEquals(dk(iteratedPartitions++), 
origin.next().partitionKey());
+            }
+        }
+
+    }
+
+    private void assertMetadata(UnfilteredRowIterator origin, 
UnfilteredRowIterator splitted, boolean isFirst)
+    {
+        assertEquals(splitted.columns(), origin.columns());
+        assertEquals(splitted.partitionKey(), origin.partitionKey());
+        assertEquals(splitted.isReverseOrder(), origin.isReverseOrder());
+        assertEquals(splitted.metadata(), origin.metadata());
+        assertEquals(splitted.stats(), origin.stats());
+
+        if (isFirst)
+        {
+            assertEquals(origin.partitionLevelDeletion(), 
splitted.partitionLevelDeletion());
+            assertEquals(origin.staticRow(), splitted.staticRow());
+        }
+        else
+        {
+            assertEquals(DeletionTime.LIVE, splitted.partitionLevelDeletion());
+            assertEquals(Rows.EMPTY_STATIC_ROW, splitted.staticRow());
+        }
+    }
+
+    public static void assertRows(UnfilteredRowIterator iterator, Row... rows)
+    {
+        Iterator<Row> rowsIterator = Arrays.asList(rows).iterator();
+
+        while (iterator.hasNext() && rowsIterator.hasNext())
+            assertEquals(iterator.next(), rowsIterator.next());
+
+        assertTrue(iterator.hasNext() == rowsIterator.hasNext());
+    }
+
+    private static DecoratedKey dk(int pk)
+    {
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(pk), 
ByteBufferUtil.bytes(pk));
+    }
+
+    private static UnfilteredRowIterator rows(RegularAndStaticColumns columns,
+                                              int pk,
+                                              DeletionTime partitionDeletion,
+                                              Row staticRow,
+                                              Unfiltered... rows)
+    {
+        Iterator<Unfiltered> rowsIterator = Arrays.asList(rows).iterator();
+        return new AbstractUnfilteredRowIterator(metadata, dk(pk), 
partitionDeletion, columns, staticRow, false, EncodingStats.NO_STATS) {
+            protected Unfiltered computeNext()
+            {
+                return rowsIterator.hasNext() ? rowsIterator.next() : 
endOfData();
+            }
+        };
+    }
+
+    private static UnfilteredPartitionIterator 
partitions(RegularAndStaticColumns columns,
+                                                          DeletionTime 
partitionDeletion,
+                                                          Row staticRow,
+                                                          SortedMap<Integer, 
List<Row>> partitions)
+    {
+        Iterator<Map.Entry<Integer, List<Row>>> partitionIt = 
partitions.entrySet().iterator();
+        return new AbstractUnfilteredPartitionIterator() {
+            public boolean hasNext()
+            {
+                return partitionIt.hasNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                Map.Entry<Integer, List<Row>> next = partitionIt.next();
+                Iterator<Row> rowsIterator = next.getValue().iterator();
+                return new AbstractUnfilteredRowIterator(metadata, 
dk(next.getKey()), partitionDeletion, columns, staticRow, false, 
EncodingStats.NO_STATS) {
+                    protected Unfiltered computeNext()
+                    {
+                        return rowsIterator.hasNext() ? rowsIterator.next() : 
endOfData();
+                    }
+                };
+            }
+
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
+        };
+    }
+
+
+    private static Row createRow(int ck, Cell... columns)
+    {
+        return createRow(ck, ck, columns);
+    }
+
+    private static Row createRow(int ck1, int ck2, Cell... columns)
+    {
+        BTreeRow.Builder builder = new BTreeRow.Builder(true);
+        builder.newRow(Util.clustering(metadata.comparator, ck1, ck2));
+        for (Cell cell : columns)
+            builder.addCell(cell);
+        return builder.build();
+    }
+
+    private static Cell createCell(ColumnMetadata metadata, int v)
+    {
+        return createCell(metadata, v, 100L, BufferCell.NO_DELETION_TIME);
+    }
+
+    private static Cell createCell(ColumnMetadata metadata, int v, long 
timestamp, int localDeletionTime)
+    {
+        return new BufferCell(metadata,
+                              timestamp,
+                              BufferCell.NO_TTL,
+                              localDeletionTime,
+                              ByteBufferUtil.bytes(v),
+                              null);
+    }
+
+    @Test
+    public void testThrottledIteratorWithRangeDeletions() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME);
+
+        // Inserting data
+        String key = "k1";
+
+        UpdateBuilder builder;
+
+        builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
+        for (int i = 0; i < 40; i += 2)
+            builder.newRow(i).add("val", i);
+        builder.applyUnsafe();
+
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(10, 
22).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+
+        builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(2);
+        for (int i = 1; i < 40; i += 2)
+            builder.newRow(i).add("val", i);
+        builder.applyUnsafe();
+
+        new RowUpdateBuilder(cfs.metadata(), 3, key).addRangeTombstone(19, 
27).build().applyUnsafe();
+        // We don't flush to test with both a range tomsbtone in memtable and 
in sstable
+
+        // Queries by name
+        int[] live = new int[]{ 4, 9, 11, 17, 28 };
+        int[] dead = new int[]{ 12, 19, 21, 24, 27 };
+
+        AbstractReadCommandBuilder.PartitionRangeBuilder cmdBuilder = 
Util.cmd(cfs);
+
+        ReadCommand cmd = cmdBuilder.build();
+
+        for (int batchSize = 2; batchSize <= 40; batchSize++)
+        {
+            List<UnfilteredRowIterator> unfilteredRowIterators = new 
LinkedList<>();
+
+            try (ReadExecutionController executionController = 
cmd.executionController();
+                 UnfilteredPartitionIterator iterator = 
cmd.executeLocally(executionController))
+            {
+                assertTrue(iterator.hasNext());
+                Iterator<UnfilteredRowIterator> throttled = 
ThrottledUnfilteredIterator.throttle(iterator, batchSize);
+                while (throttled.hasNext())
+                {
+                    UnfilteredRowIterator next = throttled.next();
+                    ImmutableBTreePartition materializedPartition = 
ImmutableBTreePartition.create(next);
+                    int unfilteredCount = 
Iterators.size(materializedPartition.unfilteredIterator());
+
+                    System.out.println("batchsize " + batchSize + " 
unfilteredCount " + unfilteredCount + " materializedPartition " + 
materializedPartition);
+
+                    if (throttled.hasNext())
+                    {
+                        if (unfilteredCount != batchSize)
+                        {
+                            //when there is extra unfiltered, it must be close 
bound marker
+                            assertEquals(batchSize + 1, unfilteredCount);
+                            Unfiltered last = 
Iterators.getLast(materializedPartition.unfilteredIterator());
+                            assertTrue(last.isRangeTombstoneMarker());
+                            RangeTombstoneMarker marker = 
(RangeTombstoneMarker) last;
+                            assertFalse(marker.isBoundary());
+                            assertTrue(marker.isClose(false));
+                        }
+                    }
+                    else
+                    {
+                        //only last batch can be smaller than batchSize
+                        assertTrue(unfilteredCount <= batchSize + 1);
+                    }
+                    
unfilteredRowIterators.add(materializedPartition.unfilteredIterator());
+                }
+                assertFalse(iterator.hasNext());
+            }
+
+            // Verify throttled data after merge
+            Partition partition = 
ImmutableBTreePartition.create(UnfilteredRowIterators.merge(unfilteredRowIterators,
 FBUtilities.nowInSeconds()));
+
+            int nowInSec = FBUtilities.nowInSeconds();
+
+            for (int i : live)
+                assertTrue("Row " + i + " should be live", 
partition.getRow(Clustering.make(ByteBufferUtil.bytes((i)))).hasLiveData(nowInSec,
 cfs.metadata().enforceStrictLiveness()));
+            for (int i : dead)
+                assertFalse("Row " + i + " shouldn't be live", 
partition.getRow(Clustering.make(ByteBufferUtil.bytes((i)))).hasLiveData(nowInSec,
 cfs.metadata().enforceStrictLiveness()));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to