Author: jbellis Date: Thu Sep 22 20:07:28 2011 New Revision: 1174360 URL: http://svn.apache.org/viewvc?rev=1174360&view=rev Log: optimize single-source case for MergeIterator patch by jbellis; tested by brandonwilliams for CASSANDRA-3234
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1174360&r1=1174359&r2=1174360&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Thu Sep 22 20:07:28 2011 @@ -89,10 +89,10 @@ public class ParallelCompactionIterable private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow> { - private final MergeIterator<RowContainer, CompactedRowContainer> reducer; + private final CloseableIterator<CompactedRowContainer> reducer; private final CompactionController controller; - public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller) + public Unwrapper(CloseableIterator<CompactedRowContainer> reducer, CompactionController controller) { this.reducer = reducer; this.controller = controller; Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1174360&r1=1174359&r2=1174360&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Sep 22 20:07:28 2011 @@ -93,7 +93,7 @@ public class QueryFilter Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator); // define a 'reduced' iterator that merges columns w/ the same name, which // greatly simplifies computing liveColumns in the presence of tombstones. - Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>() + MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>() { ColumnFamily curCF = returnCF.cloneMeShallow(); @@ -111,7 +111,7 @@ public class QueryFilter // consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the // result which removes column; which shouldn't be done on the original super column). assert current instanceof SuperColumn; - curCF.addColumn(((SuperColumn)current).cloneMe()); + curCF.addColumn(((SuperColumn) current).cloneMe()); } else { @@ -129,16 +129,17 @@ public class QueryFilter // time of the cf, if that is greater. long deletedAt = c.getMarkedForDeleteAt(); if (returnCF.getMarkedForDeleteAt() > deletedAt) - ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); + ((SuperColumn) c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); - c = filter.filterSuperColumn((SuperColumn)c, gcBefore); - ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be + c = filter.filterSuperColumn((SuperColumn) c, gcBefore); + ((SuperColumn) c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be } - curCF.clear(); + curCF.clear(); return c; } - }); + }; + Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer); topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore); } Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1174360&r1=1174359&r2=1174360&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Sep 22 20:07:28 2011 @@ -24,16 +24,15 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.IMergeIterator; import org.apache.cassandra.utils.MergeIterator; public class ReducingKeyIterator implements CloseableIterator<DecoratedKey> { - private final MergeIterator<DecoratedKey,DecoratedKey> mi; + private final IMergeIterator<DecoratedKey,DecoratedKey> mi; public ReducingKeyIterator(Collection<SSTableReader> sstables) { Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1174360&r1=1174359&r2=1174360&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Sep 22 20:07:28 2011 @@ -86,7 +86,8 @@ public class RangeSliceResponseResolver iters.add(new RowIterator(reply.rows.iterator(), response.getFrom())); } // for each row, compute the combination of all different versions seen, and repair incomplete versions - MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); + // TODO do we need to call close? + CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); List<Row> resolvedRows = new ArrayList<Row>(n); while (iter.hasNext()) Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java?rev=1174360&r1=1174359&r2=1174360&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java Thu Sep 22 20:07:28 2011 @@ -20,50 +20,30 @@ package org.apache.cassandra.utils; import java.io.IOException; import java.io.IOError; -import java.util.ArrayDeque; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; +import java.util.*; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Ordering; /** Merges sorted input iterators which individually contain unique items. */ -public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements CloseableIterator<Out> +public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out> { - public final Comparator<In> comp; + protected final Reducer<In,Out> reducer; protected final List<? extends CloseableIterator<In>> iterators; - // a queue for return: all candidates must be open and have at least one item - protected final PriorityQueue<Candidate<In>> queue; - protected MergeIterator(List<? extends CloseableIterator<In>> iters, Comparator<In> comp) + protected MergeIterator(List<? extends CloseableIterator<In>> iters, Reducer<In, Out> reducer) { this.iterators = iters; - this.comp = comp; - this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size())); - for (CloseableIterator<In> iter : iters) - { - Candidate<In> candidate = new Candidate<In>(iter, comp); - if (!candidate.advance()) - // was empty - continue; - this.queue.add(candidate); - } - } - - public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters) - { - return get(iters, (Comparator<E>)Ordering.natural()); - } - - public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters, Comparator<E> comp) - { - return new OneToOne<E>(iters, comp); + this.reducer = reducer; } - public static <In,Out> MergeIterator<In,Out> get(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer) + public static <In, Out> IMergeIterator<In, Out> get(final List<? extends CloseableIterator<In>> sources, + Comparator<In> comparator, + final Reducer<In, Out> reducer) { - return new ManyToOne<In,Out>(iters, comp, reducer); + assert !sources.isEmpty(); + if (sources.size() == 1) + return new OneToOne<In, Out>(sources, reducer); + return new ManyToOne<In, Out>(sources, comparator, reducer); } public Iterable<? extends CloseableIterator<In>> iterators() @@ -71,23 +51,6 @@ public abstract class MergeIterator<In,O return iterators; } - /** - * Consumes sorted items from the queue: should only remove items from the queue, - * not add them. - */ - protected abstract Out consume(); - - /** - * Returns consumed items to the queue. - */ - protected abstract void advance(); - - protected final Out computeNext() - { - advance(); - return consume(); - } - public void close() { for (CloseableIterator<In> iterator : this.iterators) @@ -103,47 +66,38 @@ public abstract class MergeIterator<In,O } } - /** A MergeIterator that returns a single value for each one consumed. */ - private static final class OneToOne<E> extends MergeIterator<E,E> - { - // the last returned candidate, so that we can lazily call 'advance()' - protected Candidate<E> candidate; - public OneToOne(List<? extends CloseableIterator<E>> iters, Comparator<E> comp) - { - super(iters, comp); - } - - protected final E consume() - { - candidate = queue.poll(); - if (candidate == null) - return endOfData(); - return candidate.item; - } - - protected final void advance() - { - if (candidate != null && candidate.advance()) - // has more items - queue.add(candidate); - } - } - /** A MergeIterator that consumes multiple input values per output value. */ private static final class ManyToOne<In,Out> extends MergeIterator<In,Out> { - protected final Reducer<In,Out> reducer; + public final Comparator<In> comp; + // a queue for return: all candidates must be open and have at least one item + protected final PriorityQueue<Candidate<In>> queue; // a stack of the last consumed candidates, so that we can lazily call 'advance()' // TODO: if we had our own PriorityQueue implementation we could stash items // at the end of its array, so we wouldn't need this storage protected final ArrayDeque<Candidate<In>> candidates; public ManyToOne(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer) { - super(iters, comp); - this.reducer = reducer; + super(iters, reducer); + this.comp = comp; + this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size())); + for (CloseableIterator<In> iter : iters) + { + Candidate<In> candidate = new Candidate<In>(iter, comp); + if (!candidate.advance()) + // was empty + continue; + this.queue.add(candidate); + } this.candidates = new ArrayDeque<Candidate<In>>(queue.size()); } + protected final Out computeNext() + { + advance(); + return consume(); + } + /** Consume values by sending them to the reducer while they are equal. */ protected final Out consume() { @@ -177,17 +131,13 @@ public abstract class MergeIterator<In,O private final CloseableIterator<In> iter; private final Comparator<In> comp; private In item; + public Candidate(CloseableIterator<In> iter, Comparator<In> comp) { this.iter = iter; this.comp = comp; } - public In item() - { - return item; - } - /** @return True if our iterator had an item, and it is now available */ protected boolean advance() { @@ -221,4 +171,24 @@ public abstract class MergeIterator<In,O */ protected void onKeyChange() {} } + + private static class OneToOne<In, Out> extends MergeIterator<In, Out> + { + private final CloseableIterator<In> source; + + public OneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer) + { + super(sources, reducer); + source = sources.get(0); + } + + protected Out computeNext() + { + if (!source.hasNext()) + return endOfData(); + reducer.onKeyChange(); + reducer.reduce(source.next()); + return reducer.getReduced(); + } + } } Modified: cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1174360&r1=1174359&r2=1174360&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (original) +++ cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Sep 22 20:07:28 2011 @@ -19,9 +19,7 @@ package org.apache.cassandra.utils; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; -import java.util.List; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; @@ -45,16 +43,6 @@ public class MergeIteratorTest d = new CLI(); } - @Test - public void testOneToOne() throws Exception - { - MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d), - Ordering.<String>natural()); - assert Iterators.elementsEqual(all, smi); - smi.close(); - assert a.closed && b.closed && c.closed && d.closed; - } - /** Test that duplicate values are concatted. */ @Test public void testManyToOne() throws Exception @@ -74,7 +62,7 @@ public class MergeIteratorTest return tmp; } }; - MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d), + IMergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d), Ordering.<String>natural(), reducer); assert Iterators.elementsEqual(cat, smi);