Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 0c2eaa9cb -> 06cd494c1
Add batch remove iterator to ABSC patch by Jimmy MÃ¥rdell; reviewed by Richard Low for CASSANDRA-8414 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc5fb19e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc5fb19e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc5fb19e Branch: refs/heads/cassandra-2.1 Commit: cc5fb19e5c864110b798375f43ec1597904d03ab Parents: ae380da Author: Jimmy MÃ¥rdell <ya...@spotify.com> Authored: Thu Jan 22 02:16:10 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Jan 22 02:16:10 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/ArrayBackedSortedColumns.java | 89 ++++++++++++++++++- .../org/apache/cassandra/db/ColumnFamily.java | 32 +++++++ .../apache/cassandra/db/ColumnFamilyStore.java | 6 +- .../cassandra/utils/BatchRemoveIterator.java | 32 +++++++ .../db/ArrayBackedSortedColumnsTest.java | 93 ++++++++++++++++++++ 6 files changed, 249 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6604783..0d08cce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 2.0.13: * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645) + * Add batch remove iterator to ABSC (CASSANDRA-8414) 2.0.12: http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index 389e0f8..8d553be 100644 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.Allocator; +import org.apache.cassandra.utils.BatchRemoveIterator; /** * A ColumnFamily backed by an ArrayList. @@ -54,14 +55,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns { super(metadata); this.reversed = reversed; - this.columns = new ArrayList<Column>(); + this.columns = new ArrayList<>(); } private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData metadata, boolean reversed) { super(metadata); this.reversed = reversed; - this.columns = new ArrayList<Column>(columns); + this.columns = new ArrayList<>(columns); } public ColumnFamily.Factory getFactory() @@ -292,6 +293,90 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns return new SlicesIterator(columns, getComparator(), slices, !reversed); } + @Override + public BatchRemoveIterator<Column> batchRemoveIterator() + { + return new BatchRemoveIterator<Column>() + { + private Iterator<Column> iter = iterator(); + private BitSet removedIndexes = new BitSet(columns.size()); + private int idx = -1; + private boolean shouldCallNext = true; + private boolean isCommitted = false; + private boolean removedAnything = false; + + public void commit() + { + if (isCommitted) + throw new IllegalStateException(); + isCommitted = true; + + if (!removedAnything) + return; + + // the lowest index both not visited and known to be not removed + int keepIdx = removedIndexes.nextClearBit(0); + // the running total of kept items + int resultLength = 0; + // start from the first not-removed cell, and shift left. + int removeIdx = removedIndexes.nextSetBit(keepIdx + 1); + while (removeIdx >= 0) + { + int length = removeIdx - keepIdx; + if (length > 0) + { + copy(keepIdx, resultLength, length); + resultLength += length; + } + keepIdx = removedIndexes.nextClearBit(removeIdx + 1); + if (keepIdx < 0) + keepIdx = columns.size(); + removeIdx = removedIndexes.nextSetBit(keepIdx + 1); + } + // Copy everything after the last deleted column + int length = columns.size() - keepIdx; + if (length > 0) + { + copy(keepIdx, resultLength, length); + resultLength += length; + } + + columns.subList(resultLength, columns.size()).clear(); + } + + private void copy(int src, int dst, int len) + { + // [src, src+len) and [dst, dst+len) might overlap, but it's okay because we're going from left to right + assert dst <= src : "dst must not be greater than src"; + + if (dst < src) + Collections.copy(columns.subList(dst, dst + len), columns.subList(src, src + len)); + } + + public boolean hasNext() + { + return iter.hasNext(); + } + + public Column next() + { + idx++; + shouldCallNext = false; + return iter.next(); + } + + public void remove() + { + if (shouldCallNext) + throw new IllegalStateException(); + + removedIndexes.set(reversed ? columns.size() - idx - 1 : idx); + removedAnything = true; + shouldCallNext = true; + } + }; + } + private static class SlicesIterator extends AbstractIterator<Column> { private final List<Column> list; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 7edf825..19f8c16 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -532,6 +532,38 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry return ByteBuffer.wrap(out.getData(), 0, out.getLength()); } + + /** + * @return an iterator where the removes are carried out once everything has been iterated + */ + public BatchRemoveIterator<Column> batchRemoveIterator() + { + // Default implementation is the ordinary iterator + return new BatchRemoveIterator<Column>() + { + private final Iterator<Column> iter = iterator(); + + public void commit() + { + } + + public boolean hasNext() + { + return iter.hasNext(); + } + + public Column next() + { + return iter.next(); + } + + public void remove() + { + iter.remove(); + } + }; + } + public abstract static class Factory <T extends ColumnFamily> { /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index e936473..34d3f1d 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -953,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { - Iterator<Column> iter = cf.iterator(); + BatchRemoveIterator<Column> iter = cf.batchRemoveIterator(); DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester(); boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty(); long removedBytes = 0; @@ -971,6 +971,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean removedBytes += c.dataSize(); } } + iter.commit(); return removedBytes; } @@ -993,10 +994,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (cf == null || cf.metadata.getDroppedColumns().isEmpty()) return; - Iterator<Column> iter = cf.iterator(); + BatchRemoveIterator<Column> iter = cf.batchRemoveIterator(); while (iter.hasNext()) if (isDroppedColumn(iter.next(), metadata)) iter.remove(); + iter.commit(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java new file mode 100644 index 0000000..4377426 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/BatchRemoveIterator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.Iterator; + +/** + * Iterator that allows us to more efficiently remove many items + */ +public interface BatchRemoveIterator<T> extends Iterator<T> +{ + /** + * Commits the remove operations in this batch iterator. After this no more + * deletes can be made. Any further calls to remove() or commit() will throw IllegalStateException. + */ + void commit(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc5fb19e/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java index 06e2e75..90cd70f 100644 --- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java +++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java @@ -28,10 +28,12 @@ import org.junit.Test; import static org.junit.Assert.*; import com.google.common.base.Functions; +import com.google.common.collect.Sets; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.utils.BatchRemoveIterator; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.utils.HeapAllocator; @@ -193,4 +195,95 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader iter.remove(); assertTrue(!iter.hasNext()); } + + @Test(expected = IllegalStateException.class) + public void testBatchRemoveTwice() + { + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false); + map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance); + map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance); + + BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); + batchIter.next(); + batchIter.remove(); + batchIter.remove(); + } + + @Test(expected = IllegalStateException.class) + public void testBatchCommitTwice() + { + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false); + map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance); + map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance); + + BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); + batchIter.next(); + batchIter.remove(); + batchIter.commit(); + batchIter.commit(); + } + + @Test + public void testBatchRemove() + { + testBatchRemoveInternal(false); + testBatchRemoveInternal(true); + } + + public void testBatchRemoveInternal(boolean reversed) + { + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed); + int[] values = new int[]{ 1, 2, 3, 5 }; + + for (int i = 0; i < values.length; ++i) + map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance); + + BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); + batchIter.next(); + batchIter.remove(); + batchIter.next(); + batchIter.remove(); + + assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0)); + + batchIter.commit(); + + assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0)); + } + + @Test + public void testBatchRemoveCopy() + { + // Test delete some random columns and check the result + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false); + int n = 127; + int[] values = new int[n]; + for (int i = 0; i < n; i++) values[i] = i; + Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112); + + for (int value : values) + map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance); + + BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); + while (batchIter.hasNext()) + if (toRemove.contains(batchIter.next().name().getInt(0))) + batchIter.remove(); + + batchIter.commit(); + + int expected = 0; + + while (toRemove.contains(expected)) + expected++; + + for (Column column : map) + { + assertEquals(expected, column.name().getInt(0)); + expected++; + while (toRemove.contains(expected)) + expected++; + } + + assertEquals(expected, n); + } }