Repository: cassandra Updated Branches: refs/heads/trunk 974d8fc09 -> 8ef71f3f2
Throttle base partitions during MV repair streaming to prevent OOM Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-13299 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8ef71f3f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8ef71f3f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8ef71f3f Branch: refs/heads/trunk Commit: 8ef71f3f29fb040cce18ba158ff5f289b388c30b Parents: 974d8fc Author: Zhao Yang <zhaoyangsingap...@gmail.com> Authored: Fri Aug 11 13:04:28 2017 +0800 Committer: Paulo Motta <pa...@apache.org> Committed: Thu Sep 28 05:45:25 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/rows/ThrottledUnfilteredIterator.java | 251 ++++++++ .../cassandra/streaming/StreamReceiveTask.java | 36 +- .../cassandra/utils/AbstractIterator.java | 7 +- .../rows/ThrottledUnfilteredIteratorTest.java | 613 +++++++++++++++++++ 5 files changed, 890 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 97e6b03..081ed72 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) * Use compaction threshold for STCS in L0 (CASSANDRA-13861) * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703) * Add extra information to SASI timeout exception (CASSANDRA-13677) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java new file mode 100644 index 0000000..dd33b1e --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.CloseableIterator; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A utility class to split the given {@link#UnfilteredRowIterator} into smaller chunks each + * having at most {@link #throttle} + 1 unfiltereds. + * + * Only the first output contains partition level info: {@link UnfilteredRowIterator#partitionLevelDeletion} + * and {@link UnfilteredRowIterator#staticRow}. + * + * Besides splitting, this iterator will also ensure each chunk does not finish with an open tombstone marker, + * by closing any opened tombstone markers and re-opening on the next chunk. + * + * The lifecycle of outputed {{@link UnfilteredRowIterator} only last till next call to {@link #next()}. + * + * A subsequent {@link #next} call will exhaust the previously returned iterator before computing the next, + * effectively skipping unfiltereds up to the throttle size. + * + * Closing this iterator will close the underlying iterator. + * + */ +public class ThrottledUnfilteredIterator extends AbstractIterator<UnfilteredRowIterator> implements CloseableIterator<UnfilteredRowIterator> +{ + private final UnfilteredRowIterator origin; + private final int throttle; + + // internal mutable state + private UnfilteredRowIterator throttledItr; + + // extra unfiltereds from previous iteration + private Iterator<Unfiltered> overflowed = Collections.emptyIterator(); + + @VisibleForTesting + ThrottledUnfilteredIterator(UnfilteredRowIterator origin, int throttle) + { + assert origin != null; + assert throttle > 1 : "Throttle size must be higher than 1 to properly support open and close tombstone boundaries."; + this.origin = origin; + this.throttle = throttle; + this.throttledItr = null; + } + + @Override + protected UnfilteredRowIterator computeNext() + { + // exhaust previous throttled iterator + while (throttledItr != null && throttledItr.hasNext()) + throttledItr.next(); + + if (!origin.hasNext()) + return endOfData(); + + throttledItr = new WrappingUnfilteredRowIterator(origin) + { + private int count = 0; + private boolean isFirst = throttledItr == null; + + // current batch's openMarker. if it's generated in previous batch, + // it must be consumed as first element of current batch + private RangeTombstoneMarker openMarker; + + // current batch's closeMarker. + // it must be consumed as last element of current batch + private RangeTombstoneMarker closeMarker = null; + + @Override + public boolean hasNext() + { + return (withinLimit() && wrapped.hasNext()) || closeMarker != null; + } + + @Override + public Unfiltered next() + { + if (closeMarker != null) + { + assert count == throttle; + Unfiltered toReturn = closeMarker; + closeMarker = null; + return toReturn; + } + + Unfiltered next; + assert withinLimit(); + // in the beginning of the batch, there might be remaining unfiltereds from previous iteration + if (overflowed.hasNext()) + next = overflowed.next(); + else + next = wrapped.next(); + recordNext(next); + return next; + } + + private void recordNext(Unfiltered unfiltered) + { + count++; + if (unfiltered.isRangeTombstoneMarker()) + updateMarker((RangeTombstoneMarker) unfiltered); + // when reach throttle with a remaining openMarker, we need to create corresponding closeMarker. + if (count == throttle && openMarker != null) + { + assert wrapped.hasNext(); + closeOpenMarker(wrapped.next()); + } + } + + private boolean withinLimit() + { + return count < throttle; + } + + private void updateMarker(RangeTombstoneMarker marker) + { + openMarker = marker.isOpen(isReverseOrder()) ? marker : null; + } + + /** + * There 3 cases for next, 1. if it's boundaryMarker, we split it as closeMarker for current batch, next + * openMarker for next batch 2. if it's boundMakrer, it must be closeMarker. 3. if it's Row, create + * corresponding closeMarker for current batch, and create next openMarker for next batch including current + * Row. + */ + private void closeOpenMarker(Unfiltered next) + { + assert openMarker != null; + + if (next.isRangeTombstoneMarker()) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) next; + // if it's boundary, create closeMarker for current batch and openMarker for next batch + if (marker.isBoundary()) + { + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker) marker; + closeMarker = boundary.createCorrespondingCloseMarker(isReverseOrder()); + overflowed = Collections.singleton((Unfiltered)boundary.createCorrespondingOpenMarker(isReverseOrder())).iterator(); + } + else + { + // if it's bound, it must be closeMarker. + assert marker.isClose(isReverseOrder()); + updateMarker(marker); + closeMarker = marker; + } + } + else + { + // it's Row, need to create closeMarker for current batch and openMarker for next batch + DeletionTime openDeletion = openMarker.openDeletionTime(isReverseOrder()); + ByteBuffer[] buffers = next.clustering().getRawValues(); + closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isReverseOrder(), buffers, openDeletion); + + // for next batch + overflowed = Arrays.asList(RangeTombstoneBoundMarker.inclusiveOpen(isReverseOrder(), + buffers, + openDeletion), next).iterator(); + } + } + + @Override + public DeletionTime partitionLevelDeletion() + { + return isFirst ? wrapped.partitionLevelDeletion() : DeletionTime.LIVE; + } + + @Override + public Row staticRow() + { + return isFirst ? wrapped.staticRow() : Rows.EMPTY_STATIC_ROW; + } + + @Override + public void close() + { + // no op + } + }; + return throttledItr; + } + + public void close() + { + if (origin != null) + origin.close(); + } + + /** + * Splits a {@link UnfilteredPartitionIterator} in {@link UnfilteredRowIterator} batches with size no higher + * than <b>maxBatchSize</b> + */ + public static CloseableIterator<UnfilteredRowIterator> throttle(UnfilteredPartitionIterator partitionIterator, int maxBatchSize) + { + return new AbstractIterator<UnfilteredRowIterator>() + { + ThrottledUnfilteredIterator current = null; + + protected UnfilteredRowIterator computeNext() + { + if (current != null && !current.hasNext()) + { + current.close(); + current = null; + } + + if (current == null && partitionIterator.hasNext()) + { + current = new ThrottledUnfilteredIterator(partitionIterator.next(), maxBatchSize); + assert current.hasNext() : "UnfilteredPartitionIterator should not contain empty partitions"; + } + + if (current != null && current.hasNext()) + return current.next(); + + return endOfData(); + } + + public void close() + { + if (current != null) + current.close(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 6aa70ad..988bc9e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -28,7 +28,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -37,6 +36,7 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.Bounds; @@ -45,6 +45,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Refs; @@ -58,6 +59,8 @@ public class StreamReceiveTask extends StreamTask private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask")); + private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100); + // number of files to receive private final int totalFiles; // total size of files to receive @@ -174,29 +177,28 @@ public class StreamReceiveTask extends StreamTask return cfs.metadata().params.cdc; } - Mutation createMutation(ColumnFamilyStore cfs, UnfilteredRowIterator rowIterator) - { - return new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata()))); - } - private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { boolean hasCdc = hasCDC(cfs); + ColumnFilter filter = ColumnFilter.all(cfs.metadata()); for (SSTableReader reader : readers) { Keyspace ks = Keyspace.open(reader.getKeyspaceName()); - try (ISSTableScanner scanner = reader.getScanner()) + // When doing mutation-based repair we split each partition into smaller batches + // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH)) { - while (scanner.hasNext()) + while (throttledPartitions.hasNext()) { - try (UnfilteredRowIterator rowIterator = scanner.next()) - { - // MV *can* be applied unsafe if there's no CDC on the CFS as we flush - // before transaction is done. - // - // If the CFS has CDC, however, these updates need to be written to the CommitLog - // so they get archived into the cdc_raw folder - ks.apply(createMutation(cfs, rowIterator), hasCdc, true, false); - } + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder + ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)), + hasCdc, + true, + false); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/utils/AbstractIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/AbstractIterator.java b/src/java/org/apache/cassandra/utils/AbstractIterator.java index dd3d73c..7dd32b8 100644 --- a/src/java/org/apache/cassandra/utils/AbstractIterator.java +++ b/src/java/org/apache/cassandra/utils/AbstractIterator.java @@ -23,7 +23,7 @@ import java.util.NoSuchElementException; import com.google.common.collect.PeekingIterator; -public abstract class AbstractIterator<V> implements Iterator<V>, PeekingIterator<V> +public abstract class AbstractIterator<V> implements Iterator<V>, PeekingIterator<V>, CloseableIterator<V> { private static enum State { MUST_FETCH, HAS_NEXT, DONE, FAILED } @@ -80,4 +80,9 @@ public abstract class AbstractIterator<V> implements Iterator<V>, PeekingIterato { throw new UnsupportedOperationException(); } + + public void close() + { + //no-op + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java new file mode 100644 index 0000000..2d2cce0 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import static org.apache.cassandra.SchemaLoader.standardCFMD; +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import com.google.common.collect.Iterators; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.Util; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.AbstractReadCommandBuilder; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; + +public class ThrottledUnfilteredIteratorTest extends CQLTester +{ + private static final String KSNAME = "ThrottledUnfilteredIteratorTest"; + private static final String CFNAME = "StandardInteger1"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KSNAME, + KeyspaceParams.simple(1), + standardCFMD(KSNAME, CFNAME, 1, UTF8Type.instance, Int32Type.instance, Int32Type.instance)); + } + + static final TableMetadata metadata; + static final ColumnMetadata v1Metadata; + static final ColumnMetadata v2Metadata; + + static + { + metadata = TableMetadata.builder("", "") + .addPartitionKeyColumn("pk", Int32Type.instance) + .addClusteringColumn("ck1", Int32Type.instance) + .addClusteringColumn("ck2", Int32Type.instance) + .addRegularColumn("v1", Int32Type.instance) + .addRegularColumn("v2", Int32Type.instance) + .build(); + v1Metadata = metadata.regularAndStaticColumns().columns(false).getSimple(0); + v2Metadata = metadata.regularAndStaticColumns().columns(false).getSimple(1); + } + + @Test + public void complexThrottleWithTombstoneTest() throws Throwable + { + // create cell tombstone, range tombstone, partition deletion + createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY (pk, ck1, ck2))"); + + for (int ck1 = 1; ck1 <= 150; ck1++) + for (int ck2 = 1; ck2 <= 150; ck2++) + { + int timestamp = ck1, v1 = ck1, v2 = ck2; + execute("INSERT INTO %s(pk,ck1,ck2,v1,v2) VALUES(1,?,?,?,?) using timestamp " + + timestamp, ck1, ck2, v1, v2); + } + + for (int ck1 = 1; ck1 <= 100; ck1++) + for (int ck2 = 1; ck2 <= 100; ck2++) + { + if (ck1 % 2 == 0 || ck1 % 3 == 0) // range tombstone + execute("DELETE FROM %s USING TIMESTAMP 170 WHERE pk=1 AND ck1=?", ck1); + else if (ck1 == ck2) // row tombstone + execute("DELETE FROM %s USING TIMESTAMP 180 WHERE pk=1 AND ck1=? AND ck2=?", ck1, ck2); + else if (ck1 == ck2 - 1) // cell tombstone + execute("DELETE v2 FROM %s USING TIMESTAMP 190 WHERE pk=1 AND ck1=? AND ck2=?", ck1, ck2); + } + + // range deletion + execute("DELETE FROM %s USING TIMESTAMP 150 WHERE pk=1 AND ck1 > 100 AND ck1 < 120"); + execute("DELETE FROM %s USING TIMESTAMP 150 WHERE pk=1 AND ck1 = 50 AND ck2 < 120"); + // partition deletion + execute("DELETE FROM %s USING TIMESTAMP 160 WHERE pk=1"); + + // flush and generate 1 sstable + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + cfs.forceBlockingFlush(); + cfs.disableAutoCompaction(); + cfs.forceMajorCompaction(); + + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader reader = cfs.getLiveSSTables().iterator().next(); + + try (ISSTableScanner scanner = reader.getScanner()) + { + try (UnfilteredRowIterator rowIterator = scanner.next()) + { + // only 1 partition data + assertFalse(scanner.hasNext()); + List<Unfiltered> expectedUnfiltereds = new ArrayList<>(); + rowIterator.forEachRemaining(expectedUnfiltereds::add); + + // test different throttle + for (Integer throttle : Arrays.asList(2, 3, 4, 5, 11, 41, 99, 1000, 10001)) + { + try (ISSTableScanner scannerForThrottle = reader.getScanner()) + { + assertTrue(scannerForThrottle.hasNext()); + try (UnfilteredRowIterator rowIteratorForThrottle = scannerForThrottle.next()) + { + assertFalse(scannerForThrottle.hasNext()); + verifyThrottleIterator(expectedUnfiltereds, + rowIteratorForThrottle, + new ThrottledUnfilteredIterator(rowIteratorForThrottle, throttle), + throttle); + } + } + } + } + } + } + + private void verifyThrottleIterator(List<Unfiltered> expectedUnfiltereds, + UnfilteredRowIterator rowIteratorForThrottle, + ThrottledUnfilteredIterator throttledIterator, + int throttle) + { + List<Unfiltered> output = new ArrayList<>(); + + boolean isRevered = rowIteratorForThrottle.isReverseOrder(); + boolean isFirst = true; + + while (throttledIterator.hasNext()) + { + UnfilteredRowIterator splittedIterator = throttledIterator.next(); + assertMetadata(rowIteratorForThrottle, splittedIterator, isFirst); + + List<Unfiltered> splittedUnfiltereds = new ArrayList<>(); + + splittedIterator.forEachRemaining(splittedUnfiltereds::add); + + int remain = expectedUnfiltereds.size() - output.size(); + int expectedSize = remain >= throttle ? throttle : remain; + if (splittedUnfiltereds.size() != expectedSize) + { + assertEquals(expectedSize + 1, splittedUnfiltereds.size()); + // the extra unfilter must be close bound marker + Unfiltered last = splittedUnfiltereds.get(expectedSize); + assertTrue(last.isRangeTombstoneMarker()); + RangeTombstoneMarker marker = (RangeTombstoneMarker) last; + assertFalse(marker.isBoundary()); + assertTrue(marker.isClose(isRevered)); + } + output.addAll(splittedUnfiltereds); + if (isFirst) + isFirst = false; + } + int index = 0; + RangeTombstoneMarker openMarker = null; + for (int i = 0; i < expectedUnfiltereds.size(); i++) + { + Unfiltered expected = expectedUnfiltereds.get(i); + Unfiltered data = output.get(i); + + // verify that all tombstone are paired + if (data.isRangeTombstoneMarker()) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) data; + if (marker.isClose(isRevered)) + { + assertNotNull(openMarker); + openMarker = null; + } + if (marker.isOpen(isRevered)) + { + assertNull(openMarker); + openMarker = marker; + } + } + if (expected.equals(data)) + { + index++; + } + else // because of created closeMarker and openMarker + { + assertNotNull(openMarker); + DeletionTime openDeletionTime = openMarker.openDeletionTime(isRevered); + // only boundary or row will create extra closeMarker and openMarker + if (expected.isRangeTombstoneMarker()) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) expected; + assertTrue(marker.isBoundary()); + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker) marker; + assertEquals(boundary.createCorrespondingCloseMarker(isRevered), data); + assertEquals(boundary.createCorrespondingOpenMarker(isRevered), output.get(index + 1)); + assertEquals(openDeletionTime, boundary.endDeletionTime()); + + openMarker = boundary.createCorrespondingOpenMarker(isRevered); + } + else + { + ByteBuffer[] byteBuffers = expected.clustering().getRawValues(); + RangeTombstoneBoundMarker closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isRevered, + byteBuffers, + openDeletionTime); + + RangeTombstoneBoundMarker nextOpenMarker = RangeTombstoneBoundMarker.inclusiveOpen(isRevered, + byteBuffers, + openDeletionTime); + assertEquals(closeMarker, data); + assertEquals(nextOpenMarker, output.get(index + 1)); + + openMarker = nextOpenMarker; + } + index += 2; + } + } + assertNull(openMarker); + assertEquals(output.size(), index); + } + + @Test + public void simpleThrottleTest() + { + simpleThrottleTest(false); + } + + @Test + public void skipTest() + { + simpleThrottleTest(true); + } + + public void simpleThrottleTest(boolean skipOdd) + { + // all live rows with partition deletion + ThrottledUnfilteredIterator throttledIterator; + UnfilteredRowIterator origin; + + List<Row> rows = new ArrayList<>(); + int rowCount = 1111; + + for (int i = 0; i < rowCount; i++) + rows.add(createRow(i, createCell(v1Metadata, i), createCell(v2Metadata, i))); + + // testing different throttle limit + for (int throttle = 2; throttle < 1200; throttle += 21) + { + origin = rows(metadata.regularAndStaticColumns(), + 1, + new DeletionTime(0, 100), + Rows.EMPTY_STATIC_ROW, + rows.toArray(new Row[0])); + throttledIterator = new ThrottledUnfilteredIterator(origin, throttle); + + int splittedCount = (int) Math.ceil(rowCount*1.0/throttle); + for (int i = 1; i <= splittedCount; i++) + { + UnfilteredRowIterator splitted = throttledIterator.next(); + assertMetadata(origin, splitted, i == 1); + // no op + splitted.close(); + + int start = (i - 1) * throttle; + int end = i == splittedCount ? rowCount : i * throttle; + if (skipOdd && (i % 2) == 0) + { + assertRows(splitted, rows.subList(start, end).toArray(new Row[0])); + } + } + assertTrue(!throttledIterator.hasNext()); + } + } + + @Test + public void throttledPartitionIteratorTest() + { + // all live rows with partition deletion + CloseableIterator<UnfilteredRowIterator> throttledIterator; + UnfilteredPartitionIterator origin; + + SortedMap<Integer, List<Row>> partitions = new TreeMap<>(); + int partitionCount = 13; + int baseRowsPerPartition = 1111; + + for (int i = 1; i <= partitionCount; i++) + { + ArrayList<Row> rows = new ArrayList<>(); + for (int j = 0; j < (baseRowsPerPartition + i); j++) + rows.add(createRow(i, createCell(v1Metadata, j), createCell(v2Metadata, j))); + partitions.put(i, rows); + } + + // testing different throttle limit + for (int throttle = 2; throttle < 1200; throttle += 21) + { + origin = partitions(metadata.regularAndStaticColumns(), + new DeletionTime(0, 100), + Rows.EMPTY_STATIC_ROW, + partitions); + throttledIterator = ThrottledUnfilteredIterator.throttle(origin, throttle); + + int currentPartition = 0; + int rowsInPartition = 0; + int expectedSplitCount = 0; + int currentSplit = 1; + while (throttledIterator.hasNext()) + { + UnfilteredRowIterator splitted = throttledIterator.next(); + if (currentSplit > expectedSplitCount) + { + currentPartition++; + rowsInPartition = partitions.get(currentPartition).size(); + expectedSplitCount = (int) Math.ceil(rowsInPartition * 1.0 / throttle); + currentSplit = 1; + } + UnfilteredRowIterator current = rows(metadata.regularAndStaticColumns(), + currentPartition, + new DeletionTime(0, 100), + Rows.EMPTY_STATIC_ROW, + partitions.get(currentPartition).toArray(new Row[0])); + assertMetadata(current, splitted, currentSplit == 1); + // no op + splitted.close(); + + int start = (currentSplit - 1) * throttle; + int end = currentSplit == expectedSplitCount ? rowsInPartition : currentSplit * throttle; + assertRows(splitted, partitions.get(currentPartition).subList(start, end).toArray(new Row[0])); + currentSplit++; + } + } + + + origin = partitions(metadata.regularAndStaticColumns(), + new DeletionTime(0, 100), + Rows.EMPTY_STATIC_ROW, + partitions); + try + { + try (CloseableIterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(origin, 10)) + { + int i = 0; + while (throttled.hasNext()) + { + assertEquals(dk(1), throttled.next().partitionKey()); + if (i++ == 10) + { + throw new RuntimeException("Dummy exception"); + } + } + fail("Should not reach here"); + } + } + catch (RuntimeException rte) + { + int iteratedPartitions = 2; + while (iteratedPartitions <= partitionCount) + { + // check that original iterator was not closed + assertTrue(origin.hasNext()); + // check it's possible to fetch second partition from original iterator + assertEquals(dk(iteratedPartitions++), origin.next().partitionKey()); + } + } + + } + + private void assertMetadata(UnfilteredRowIterator origin, UnfilteredRowIterator splitted, boolean isFirst) + { + assertEquals(splitted.columns(), origin.columns()); + assertEquals(splitted.partitionKey(), origin.partitionKey()); + assertEquals(splitted.isReverseOrder(), origin.isReverseOrder()); + assertEquals(splitted.metadata(), origin.metadata()); + assertEquals(splitted.stats(), origin.stats()); + + if (isFirst) + { + assertEquals(origin.partitionLevelDeletion(), splitted.partitionLevelDeletion()); + assertEquals(origin.staticRow(), splitted.staticRow()); + } + else + { + assertEquals(DeletionTime.LIVE, splitted.partitionLevelDeletion()); + assertEquals(Rows.EMPTY_STATIC_ROW, splitted.staticRow()); + } + } + + public static void assertRows(UnfilteredRowIterator iterator, Row... rows) + { + Iterator<Row> rowsIterator = Arrays.asList(rows).iterator(); + + while (iterator.hasNext() && rowsIterator.hasNext()) + assertEquals(iterator.next(), rowsIterator.next()); + + assertTrue(iterator.hasNext() == rowsIterator.hasNext()); + } + + private static DecoratedKey dk(int pk) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(pk), ByteBufferUtil.bytes(pk)); + } + + private static UnfilteredRowIterator rows(RegularAndStaticColumns columns, + int pk, + DeletionTime partitionDeletion, + Row staticRow, + Unfiltered... rows) + { + Iterator<Unfiltered> rowsIterator = Arrays.asList(rows).iterator(); + return new AbstractUnfilteredRowIterator(metadata, dk(pk), partitionDeletion, columns, staticRow, false, EncodingStats.NO_STATS) { + protected Unfiltered computeNext() + { + return rowsIterator.hasNext() ? rowsIterator.next() : endOfData(); + } + }; + } + + private static UnfilteredPartitionIterator partitions(RegularAndStaticColumns columns, + DeletionTime partitionDeletion, + Row staticRow, + SortedMap<Integer, List<Row>> partitions) + { + Iterator<Map.Entry<Integer, List<Row>>> partitionIt = partitions.entrySet().iterator(); + return new AbstractUnfilteredPartitionIterator() { + public boolean hasNext() + { + return partitionIt.hasNext(); + } + + public UnfilteredRowIterator next() + { + Map.Entry<Integer, List<Row>> next = partitionIt.next(); + Iterator<Row> rowsIterator = next.getValue().iterator(); + return new AbstractUnfilteredRowIterator(metadata, dk(next.getKey()), partitionDeletion, columns, staticRow, false, EncodingStats.NO_STATS) { + protected Unfiltered computeNext() + { + return rowsIterator.hasNext() ? rowsIterator.next() : endOfData(); + } + }; + } + + public TableMetadata metadata() + { + return metadata; + } + }; + } + + + private static Row createRow(int ck, Cell... columns) + { + return createRow(ck, ck, columns); + } + + private static Row createRow(int ck1, int ck2, Cell... columns) + { + BTreeRow.Builder builder = new BTreeRow.Builder(true); + builder.newRow(Util.clustering(metadata.comparator, ck1, ck2)); + for (Cell cell : columns) + builder.addCell(cell); + return builder.build(); + } + + private static Cell createCell(ColumnMetadata metadata, int v) + { + return createCell(metadata, v, 100L, BufferCell.NO_DELETION_TIME); + } + + private static Cell createCell(ColumnMetadata metadata, int v, long timestamp, int localDeletionTime) + { + return new BufferCell(metadata, + timestamp, + BufferCell.NO_TTL, + localDeletionTime, + ByteBufferUtil.bytes(v), + null); + } + + @Test + public void testThrottledIteratorWithRangeDeletions() throws Exception + { + Keyspace keyspace = Keyspace.open(KSNAME); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME); + + // Inserting data + String key = "k1"; + + UpdateBuilder builder; + + builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0); + for (int i = 0; i < 40; i += 2) + builder.newRow(i).add("val", i); + builder.applyUnsafe(); + + new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(10, 22).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + + builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(2); + for (int i = 1; i < 40; i += 2) + builder.newRow(i).add("val", i); + builder.applyUnsafe(); + + new RowUpdateBuilder(cfs.metadata(), 3, key).addRangeTombstone(19, 27).build().applyUnsafe(); + // We don't flush to test with both a range tomsbtone in memtable and in sstable + + // Queries by name + int[] live = new int[]{ 4, 9, 11, 17, 28 }; + int[] dead = new int[]{ 12, 19, 21, 24, 27 }; + + AbstractReadCommandBuilder.PartitionRangeBuilder cmdBuilder = Util.cmd(cfs); + + ReadCommand cmd = cmdBuilder.build(); + + for (int batchSize = 2; batchSize <= 40; batchSize++) + { + List<UnfilteredRowIterator> unfilteredRowIterators = new LinkedList<>(); + + try (ReadExecutionController executionController = cmd.executionController(); + UnfilteredPartitionIterator iterator = cmd.executeLocally(executionController)) + { + assertTrue(iterator.hasNext()); + Iterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(iterator, batchSize); + while (throttled.hasNext()) + { + UnfilteredRowIterator next = throttled.next(); + ImmutableBTreePartition materializedPartition = ImmutableBTreePartition.create(next); + int unfilteredCount = Iterators.size(materializedPartition.unfilteredIterator()); + + System.out.println("batchsize " + batchSize + " unfilteredCount " + unfilteredCount + " materializedPartition " + materializedPartition); + + if (throttled.hasNext()) + { + if (unfilteredCount != batchSize) + { + //when there is extra unfiltered, it must be close bound marker + assertEquals(batchSize + 1, unfilteredCount); + Unfiltered last = Iterators.getLast(materializedPartition.unfilteredIterator()); + assertTrue(last.isRangeTombstoneMarker()); + RangeTombstoneMarker marker = (RangeTombstoneMarker) last; + assertFalse(marker.isBoundary()); + assertTrue(marker.isClose(false)); + } + } + else + { + //only last batch can be smaller than batchSize + assertTrue(unfilteredCount <= batchSize + 1); + } + unfilteredRowIterators.add(materializedPartition.unfilteredIterator()); + } + assertFalse(iterator.hasNext()); + } + + // Verify throttled data after merge + Partition partition = ImmutableBTreePartition.create(UnfilteredRowIterators.merge(unfilteredRowIterators, FBUtilities.nowInSeconds())); + + int nowInSec = FBUtilities.nowInSeconds(); + + for (int i : live) + assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(ByteBufferUtil.bytes((i)))).hasLiveData(nowInSec, cfs.metadata().enforceStrictLiveness())); + for (int i : dead) + assertFalse("Row " + i + " shouldn't be live", partition.getRow(Clustering.make(ByteBufferUtil.bytes((i)))).hasLiveData(nowInSec, cfs.metadata().enforceStrictLiveness())); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org