Author: jbellis Date: Thu Sep 22 20:49:17 2011 New Revision: 1174382 URL: http://svn.apache.org/viewvc?rev=1174382&view=rev Log: merge #3234 from 1.0.0
Modified: cassandra/branches/cassandra-1.0/ (props changed) cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/contrib/ (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/AbstractColumnContainer.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Column.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumn.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumnContainer.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowIteratorFactory.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/MergeIterator.java cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Propchange: cassandra/branches/cassandra-1.0/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0:1125021-1130369 /cassandra/branches/cassandra-0.8.1:1101014-1125018 /cassandra/branches/cassandra-1.0:1167106,1167185 -/cassandra/branches/cassandra-1.0.0:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020 /cassandra/trunk:1167085-1167102,1169870 Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Sep 22 20:49:17 2011 @@ -15,8 +15,9 @@ * Base choice of random or "balanced" token on bootstrap on whether schema definitions were found (CASSANDRA-3219) * Fixes for LeveledCompactionStrategy score computation, prioritization, - and scheduling (CASSANDRA-3224) + scheduling, and performance (CASSANDRA-3224, 3234) * parallelize sstable open at server startup (CASSANDRA-2988) + * fix handling of exceptions writing to OutboundTcpConnection (CASSANDRA-3235) 1.0.0-beta1 Propchange: cassandra/branches/cassandra-1.0/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018 /cassandra/branches/cassandra-1.0/contrib:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/contrib:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0/contrib:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020 /cassandra/trunk/contrib:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Sep 22 20:49:17 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1173617,1173663 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1174379 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1167102,1169870 Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/AbstractColumnContainer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Thu Sep 22 20:49:17 2011 @@ -244,4 +244,16 @@ public abstract class AbstractColumnCont this.localDeletionTime = localDeletionTime; } } + + public boolean hasExpiredTombstones(int gcBefore) + { + if (isMarkedForDelete() && getLocalDeletionTime() < gcBefore) + return true; + + for (IColumn column : columns) + if (column.hasExpiredTombstones(gcBefore)) + return true; + + return false; + } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Column.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Column.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Column.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Column.java Thu Sep 22 20:49:17 2011 @@ -280,5 +280,10 @@ public class Column implements IColumn if (valueValidator != null) valueValidator.validate(value()); } + + public boolean hasExpiredTombstones(int gcBefore) + { + return isMarkedForDelete() && getLocalDeletionTime() < gcBefore; + } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Sep 22 20:49:17 2011 @@ -812,8 +812,10 @@ public class ColumnFamilyStore implement private static void removeDeletedStandard(ColumnFamily cf, int gcBefore) { - for (IColumn c : cf) + Iterator<IColumn> iter = cf.iterator(); + while (iter.hasNext()) { + IColumn c = iter.next(); ByteBuffer cname = c.name(); // remove columns if // (a) the column itself is tombstoned or @@ -821,7 +823,7 @@ public class ColumnFamilyStore implement if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore) || c.timestamp() <= cf.getMarkedForDeleteAt()) { - cf.remove(cname); + iter.remove(); } } } @@ -836,15 +838,17 @@ public class ColumnFamilyStore implement { SuperColumn c = (SuperColumn)iter.next(); long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt()); - for (IColumn subColumn : c.getSubColumns()) + Iterator<IColumn> subIter = c.getSubColumns().iterator(); + while (subIter.hasNext()) { + IColumn subColumn = subIter.next(); // remove subcolumns if // (a) the subcolumn itself is tombstoned or // (b) the supercolumn is tombstoned and the subcolumn is not newer than it if (subColumn.timestamp() <= minTimestamp || (subColumn.isMarkedForDelete() && subColumn.getLocalDeletionTime() <= gcBefore)) { - c.remove(subColumn.name()); + subIter.remove(); } } if (c.getSubColumns().isEmpty() && c.getLocalDeletionTime() <= gcBefore) Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumn.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumn.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumn.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumn.java Thu Sep 22 20:49:17 2011 @@ -72,6 +72,11 @@ public interface IColumn boolean isLive(); /** + * @return true if the column or any its subcolumns expired before @param gcBefore + */ + public boolean hasExpiredTombstones(int gcBefore); + + /** * For a standard column, this is the same as timestamp(). * For a super column, this is the max column timestamp of the sub columns. */ Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumnContainer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumnContainer.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/IColumnContainer.java Thu Sep 22 20:49:17 2011 @@ -44,7 +44,7 @@ public interface IColumnContainer public boolean isMarkedForDelete(); public long getMarkedForDeleteAt(); - public int getLocalDeletionTime(); + public boolean hasExpiredTombstones(int gcBefore); public AbstractType getComparator(); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowIteratorFactory.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowIteratorFactory.java Thu Sep 22 20:49:17 2011 @@ -100,6 +100,11 @@ public class RowIteratorFactory private DecoratedKey key; private ColumnFamily returnCF; + public boolean trivialReduceIsTrivial() + { + return false; + } + @Override protected void onKeyChange() { Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java Thu Sep 22 20:49:17 2011 @@ -92,4 +92,9 @@ public abstract class AbstractCompaction * @return the number of background tasks estimated to still be needed for this columnfamilystore */ public abstract int getEstimatedRemainingTasks(); + + /** + * @return size in bytes of the largest sstables for this strategy + */ + public abstract long getMaxSSTableSize(); } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java Thu Sep 22 20:49:17 2011 @@ -88,6 +88,11 @@ public class CompactionIterable extends { protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>(); + public boolean trivialReduceIsTrivial() + { + return false; + } + public void reduce(IColumnIterator current) { rows.add((SSTableIdentityIterator) current); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Sep 22 20:49:17 2011 @@ -120,10 +120,11 @@ public class CompactionTask extends Abst long startTime = System.currentTimeMillis(); long totalkeysWritten = 0; - // TODO the int cast here is potentially buggy - int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(toCompact)); + long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact)); + long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize()); + long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); + logger.debug("Expected bloom filter size : " + keysPerSSTable); AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() ? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller) @@ -152,7 +153,7 @@ public class CompactionTask extends Abst return 0; } - SSTableWriter writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact); + SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact); writers.add(writer); while (nni.hasNext()) { @@ -179,7 +180,7 @@ public class CompactionTask extends Abst SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact)); cachedKeyMap.put(toIndex, cachedKeys); sstables.add(toIndex); - writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact); + writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact); writers.add(writer); cachedKeys = new HashMap<DecoratedKey, Long>(); } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Thu Sep 22 20:49:17 2011 @@ -215,6 +215,11 @@ public class LazilyCompactedRow extends int size = 0; long maxTimestampSeen = Long.MIN_VALUE; + public boolean trivialReduceIsTrivial() + { + return true; + } + public void reduce(IColumn current) { container.addColumn(current); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Thu Sep 22 20:49:17 2011 @@ -28,11 +28,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; @@ -47,7 +45,7 @@ public class LeveledCompactionStrategy e private LeveledManifest manifest; private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; - private final int maxSSTableSize; + private final int maxSSTableSizeInMB; private final AtomicReference<LeveledCompactionTask> task = new AtomicReference<LeveledCompactionTask>(); public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) @@ -70,12 +68,12 @@ public class LeveledCompactionStrategy e } } } - maxSSTableSize = configuredMaxSSTableSize; + maxSSTableSizeInMB = configuredMaxSSTableSize; cfs.getDataTracker().subscribe(this); logger.info(this + " subscribed to the data tracker."); - manifest = LeveledManifest.create(cfs, this.maxSSTableSize); + manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB); logger.debug("Created {}", manifest); // override min/max for this strategy cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE); @@ -119,7 +117,7 @@ public class LeveledCompactionStrategy e return Collections.emptyList(); } - LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize); + LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSizeInMB); return task.compareAndSet(currentTask, newTask) ? Collections.<AbstractCompactionTask>singletonList(newTask) : Collections.<AbstractCompactionTask>emptyList(); @@ -156,6 +154,11 @@ public class LeveledCompactionStrategy e } } + public long getMaxSSTableSize() + { + return maxSSTableSizeInMB * 1024 * 1024; + } + @Override public String toString() { Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Thu Sep 22 20:49:17 2011 @@ -89,10 +89,10 @@ public class ParallelCompactionIterable private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow> { - private final MergeIterator<RowContainer, CompactedRowContainer> reducer; + private final CloseableIterator<CompactedRowContainer> reducer; private final CompactionController controller; - public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller) + public Unwrapper(CloseableIterator<CompactedRowContainer> reducer, CompactionController controller) { this.reducer = reducer; this.controller = controller; @@ -148,6 +148,11 @@ public class ParallelCompactionIterable private final ThreadPoolExecutor executor; private int row = 0; + public boolean trivialReduceIsTrivial() + { + return false; + } + private Reducer() { super(); @@ -224,6 +229,7 @@ public class ParallelCompactionIterable } else { + // addAll is ok even if cf is an ArrayBackedSortedColumns cf.addAll(thisCF, HeapAllocator.instance); } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Thu Sep 22 20:49:17 2011 @@ -66,7 +66,27 @@ public class PrecompactedRow extends Abs public static ColumnFamily removeDeletedAndOldShards(DecoratedKey<?> key, CompactionController controller, ColumnFamily cf) { - return removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf); + // avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS + // gets behind and has hundreds of overlapping L0 sstables. Essentially, this method is an + // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf), + // taking this into account. + Boolean shouldPurge = null; + + if (cf.hasExpiredTombstones(controller.gcBefore)) + shouldPurge = controller.shouldPurge(key); + ColumnFamily compacted = shouldPurge != null && shouldPurge + ? ColumnFamilyStore.removeDeleted(cf, controller.gcBefore) + : cf; + + if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative()) + { + if (shouldPurge == null) + shouldPurge = controller.shouldPurge(key); + if (shouldPurge) + CounterColumn.removeOldShards(compacted, controller.gcBefore); + } + + return compacted; } public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, CompactionController controller, ColumnFamily cf) @@ -105,6 +125,7 @@ public class PrecompactedRow extends Abs } else { + // addAll is ok even if cf is an ArrayBackedSortedColumns cf.addAll(thisCF, HeapAllocator.instance); } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java Thu Sep 22 20:49:17 2011 @@ -176,6 +176,11 @@ public class SizeTieredCompactionStrateg return minSSTableSize; } + public long getMaxSSTableSize() + { + return Long.MAX_VALUE; + } + public String toString() { return String.format("SizeTieredCompactionStrategy[%s/%s]", Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Sep 22 20:49:17 2011 @@ -93,10 +93,15 @@ public class QueryFilter Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator); // define a 'reduced' iterator that merges columns w/ the same name, which // greatly simplifies computing liveColumns in the presence of tombstones. - Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>() + MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>() { ColumnFamily curCF = returnCF.cloneMeShallow(); + public boolean trivialReduceIsTrivial() + { + return true; + } + protected boolean isEqual(IColumn o1, IColumn o2) { return o1.name().equals(o2.name()); @@ -111,7 +116,7 @@ public class QueryFilter // consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the // result which removes column; which shouldn't be done on the original super column). assert current instanceof SuperColumn; - curCF.addColumn(((SuperColumn)current).cloneMe()); + curCF.addColumn(((SuperColumn) current).cloneMe()); } else { @@ -129,16 +134,17 @@ public class QueryFilter // time of the cf, if that is greater. long deletedAt = c.getMarkedForDeleteAt(); if (returnCF.getMarkedForDeleteAt() > deletedAt) - ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); + ((SuperColumn) c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); - c = filter.filterSuperColumn((SuperColumn)c, gcBefore); - ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be + c = filter.filterSuperColumn((SuperColumn) c, gcBefore); + ((SuperColumn) c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be } - curCF.clear(); + curCF.clear(); return c; } - }); + }; + Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer); topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore); } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Sep 22 20:49:17 2011 @@ -24,16 +24,15 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.IMergeIterator; import org.apache.cassandra.utils.MergeIterator; public class ReducingKeyIterator implements CloseableIterator<DecoratedKey> { - private final MergeIterator<DecoratedKey,DecoratedKey> mi; + private final IMergeIterator<DecoratedKey,DecoratedKey> mi; public ReducingKeyIterator(Collection<SSTableReader> sstables) { @@ -44,6 +43,11 @@ public class ReducingKeyIterator impleme { DecoratedKey<?> reduced = null; + public boolean trivialReduceIsTrivial() + { + return true; + } + public void reduce(DecoratedKey current) { reduced = current; Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Thu Sep 22 20:49:17 2011 @@ -27,9 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.ICountableColumnIterator; import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.io.util.RandomAccessReader; @@ -228,7 +226,7 @@ public class SSTableIdentityIterator imp public ColumnFamily getColumnFamilyWithColumns() throws IOException { assert inputWithTracker.getBytesRead() == headerSize(); - ColumnFamily cf = columnFamily.cloneMeShallow(); + ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false); // since we already read column count, just pass that value and continue deserialization ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote); if (validateColumns) Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Thu Sep 22 20:49:17 2011 @@ -136,7 +136,7 @@ public class OutboundTcpConnection exten } } - static void write(Message message, String id, DataOutputStream out) + static void write(Message message, String id, DataOutputStream out) throws IOException { /* Setting up the protocol header. This is 4 bytes long @@ -157,23 +157,16 @@ public class OutboundTcpConnection exten // Setting up the version bit header |= (message.getVersion() << 8); - try - { - out.writeInt(MessagingService.PROTOCOL_MAGIC); - out.writeInt(header); - // compute total Message length for compatibility w/ 0.8 and earlier - byte[] bytes = message.getMessageBody(); - int total = messageLength(message.header_, id, bytes); - out.writeInt(total); - out.writeUTF(id); - Header.serializer().serialize(message.header_, out, message.getVersion()); - out.writeInt(bytes.length); - out.write(bytes); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + out.writeInt(MessagingService.PROTOCOL_MAGIC); + out.writeInt(header); + // compute total Message length for compatibility w/ 0.8 and earlier + byte[] bytes = message.getMessageBody(); + int total = messageLength(message.header_, id, bytes); + out.writeInt(total); + out.writeUTF(id); + Header.serializer().serialize(message.header_, out, message.getVersion()); + out.writeInt(bytes.length); + out.write(bytes); } public static int messageLength(Header header, String id, byte[] bytes) Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Sep 22 20:49:17 2011 @@ -86,7 +86,8 @@ public class RangeSliceResponseResolver iters.add(new RowIterator(reply.rows.iterator(), response.getFrom())); } // for each row, compute the combination of all different versions seen, and repair incomplete versions - MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); + // TODO do we need to call close? + CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); List<Row> resolvedRows = new ArrayList<Row>(n); while (iter.hasNext()) @@ -140,6 +141,11 @@ public class RangeSliceResponseResolver List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size()); DecoratedKey key; + public boolean trivialReduceIsTrivial() + { + return false; + } + public void reduce(Pair<Row,InetAddress> current) { key = current.left.key; Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/MergeIterator.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/MergeIterator.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/MergeIterator.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/MergeIterator.java Thu Sep 22 20:49:17 2011 @@ -20,50 +20,32 @@ package org.apache.cassandra.utils; import java.io.IOException; import java.io.IOError; -import java.util.ArrayDeque; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; +import java.util.*; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Ordering; /** Merges sorted input iterators which individually contain unique items. */ -public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements CloseableIterator<Out> +public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out> { - public final Comparator<In> comp; + protected final Reducer<In,Out> reducer; protected final List<? extends CloseableIterator<In>> iterators; - // a queue for return: all candidates must be open and have at least one item - protected final PriorityQueue<Candidate<In>> queue; - protected MergeIterator(List<? extends CloseableIterator<In>> iters, Comparator<In> comp) + protected MergeIterator(List<? extends CloseableIterator<In>> iters, Reducer<In, Out> reducer) { this.iterators = iters; - this.comp = comp; - this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size())); - for (CloseableIterator<In> iter : iters) - { - Candidate<In> candidate = new Candidate<In>(iter, comp); - if (!candidate.advance()) - // was empty - continue; - this.queue.add(candidate); - } - } - - public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters) - { - return get(iters, (Comparator<E>)Ordering.natural()); - } - - public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters, Comparator<E> comp) - { - return new OneToOne<E>(iters, comp); + this.reducer = reducer; } - public static <In,Out> MergeIterator<In,Out> get(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer) - { - return new ManyToOne<In,Out>(iters, comp, reducer); + public static <In, Out> IMergeIterator<In, Out> get(final List<? extends CloseableIterator<In>> sources, + Comparator<In> comparator, + final Reducer<In, Out> reducer) + { + assert !sources.isEmpty(); + if (sources.size() == 1) + return reducer.trivialReduceIsTrivial() + ? new TrivialOneToOne<In, Out>(sources, reducer) + : new OneToOne<In, Out>(sources, reducer); + return new ManyToOne<In, Out>(sources, comparator, reducer); } public Iterable<? extends CloseableIterator<In>> iterators() @@ -71,23 +53,6 @@ public abstract class MergeIterator<In,O return iterators; } - /** - * Consumes sorted items from the queue: should only remove items from the queue, - * not add them. - */ - protected abstract Out consume(); - - /** - * Returns consumed items to the queue. - */ - protected abstract void advance(); - - protected final Out computeNext() - { - advance(); - return consume(); - } - public void close() { for (CloseableIterator<In> iterator : this.iterators) @@ -103,47 +68,38 @@ public abstract class MergeIterator<In,O } } - /** A MergeIterator that returns a single value for each one consumed. */ - private static final class OneToOne<E> extends MergeIterator<E,E> - { - // the last returned candidate, so that we can lazily call 'advance()' - protected Candidate<E> candidate; - public OneToOne(List<? extends CloseableIterator<E>> iters, Comparator<E> comp) - { - super(iters, comp); - } - - protected final E consume() - { - candidate = queue.poll(); - if (candidate == null) - return endOfData(); - return candidate.item; - } - - protected final void advance() - { - if (candidate != null && candidate.advance()) - // has more items - queue.add(candidate); - } - } - /** A MergeIterator that consumes multiple input values per output value. */ private static final class ManyToOne<In,Out> extends MergeIterator<In,Out> { - protected final Reducer<In,Out> reducer; + public final Comparator<In> comp; + // a queue for return: all candidates must be open and have at least one item + protected final PriorityQueue<Candidate<In>> queue; // a stack of the last consumed candidates, so that we can lazily call 'advance()' // TODO: if we had our own PriorityQueue implementation we could stash items // at the end of its array, so we wouldn't need this storage protected final ArrayDeque<Candidate<In>> candidates; public ManyToOne(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer) { - super(iters, comp); - this.reducer = reducer; + super(iters, reducer); + this.comp = comp; + this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size())); + for (CloseableIterator<In> iter : iters) + { + Candidate<In> candidate = new Candidate<In>(iter, comp); + if (!candidate.advance()) + // was empty + continue; + this.queue.add(candidate); + } this.candidates = new ArrayDeque<Candidate<In>>(queue.size()); } + protected final Out computeNext() + { + advance(); + return consume(); + } + /** Consume values by sending them to the reducer while they are equal. */ protected final Out consume() { @@ -177,17 +133,13 @@ public abstract class MergeIterator<In,O private final CloseableIterator<In> iter; private final Comparator<In> comp; private In item; + public Candidate(CloseableIterator<In> iter, Comparator<In> comp) { this.iter = iter; this.comp = comp; } - public In item() - { - return item; - } - /** @return True if our iterator had an item, and it is now available */ protected boolean advance() { @@ -207,6 +159,11 @@ public abstract class MergeIterator<In,O public static abstract class Reducer<In,Out> { /** + * @return true if Out is the same as In for the case of a single source iterator + */ + public abstract boolean trivialReduceIsTrivial(); + + /** * combine this object with the previous ones. * intermediate state is up to your implementation. */ @@ -221,4 +178,42 @@ public abstract class MergeIterator<In,O */ protected void onKeyChange() {} } + + private static class OneToOne<In, Out> extends MergeIterator<In, Out> + { + private final CloseableIterator<In> source; + + public OneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer) + { + super(sources, reducer); + source = sources.get(0); + } + + protected Out computeNext() + { + if (!source.hasNext()) + return endOfData(); + reducer.onKeyChange(); + reducer.reduce(source.next()); + return reducer.getReduced(); + } + } + + private static class TrivialOneToOne<In, Out> extends MergeIterator<In, Out> + { + private final CloseableIterator<?> source; + + public TrivialOneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer) + { + super(sources, reducer); + source = sources.get(0); + } + + protected Out computeNext() + { + if (!source.hasNext()) + return endOfData(); + return (Out) source.next(); + } + } } Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1174382&r1=1174381&r2=1174382&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (original) +++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Sep 22 20:49:17 2011 @@ -19,9 +19,7 @@ package org.apache.cassandra.utils; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; -import java.util.List; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; @@ -45,16 +43,6 @@ public class MergeIteratorTest d = new CLI(); } - @Test - public void testOneToOne() throws Exception - { - MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d), - Ordering.<String>natural()); - assert Iterators.elementsEqual(all, smi); - smi.close(); - assert a.closed && b.closed && c.closed && d.closed; - } - /** Test that duplicate values are concatted. */ @Test public void testManyToOne() throws Exception @@ -62,6 +50,12 @@ public class MergeIteratorTest MergeIterator.Reducer<String,String> reducer = new MergeIterator.Reducer<String,String>() { String concatted = ""; + + public boolean trivialReduceIsTrivial() + { + return false; // technically true, but let's not optimize anything away here... + } + public void reduce(String value) { concatted += value; @@ -74,7 +68,7 @@ public class MergeIteratorTest return tmp; } }; - MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d), + IMergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d), Ordering.<String>natural(), reducer); assert Iterators.elementsEqual(cat, smi);