Improve BTree

patch by benedict; reviewed by branimir for CASSANDRA-9769


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5250d7ff
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5250d7ff
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5250d7ff

Branch: refs/heads/trunk
Commit: 5250d7ffd2df9e75219ce5a544ae0209ba3446a4
Parents: ffc7745
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Wed Jul 15 11:28:11 2015 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Wed Jul 15 11:28:11 2015 +0100

----------------------------------------------------------------------
 .../ForwardingPrimaryKeyRestrictions.java       |   2 +-
 .../restrictions/PrimaryKeyRestrictionSet.java  |   6 +-
 .../restrictions/PrimaryKeyRestrictions.java    |   3 +-
 .../restrictions/StatementRestrictions.java     |   2 +-
 .../cql3/restrictions/TokenFilter.java          |   2 +-
 .../cql3/restrictions/TokenRestriction.java     |   2 +-
 .../cql3/statements/ModificationStatement.java  |   3 +-
 .../apache/cassandra/db/CounterMutation.java    |   5 +-
 .../org/apache/cassandra/db/MultiCBuilder.java  |  31 +-
 .../apache/cassandra/db/PartitionColumns.java   |  25 +-
 .../db/SinglePartitionNamesCommand.java         |   7 +-
 .../db/filter/ClusteringIndexNamesFilter.java   |   5 +-
 .../db/index/SecondaryIndexSearcher.java        |   5 +-
 .../db/index/composites/CompositesSearcher.java |   7 +-
 .../db/partitions/AtomicBTreePartition.java     |   2 +-
 .../db/partitions/PartitionUpdate.java          |   6 +
 .../cassandra/thrift/CassandraServer.java       |   5 +-
 .../cassandra/utils/IndexedSearchIterator.java  |  33 +
 .../apache/cassandra/utils/SearchIterator.java  |  12 +-
 .../org/apache/cassandra/utils/btree/BTree.java | 624 +++++++++++---
 .../utils/btree/BTreeSearchIterator.java        | 210 +++--
 .../apache/cassandra/utils/btree/BTreeSet.java  | 642 ++++++++++++++
 .../apache/cassandra/utils/btree/Builder.java   | 119 ---
 .../apache/cassandra/utils/btree/Cursor.java    | 198 -----
 .../cassandra/utils/btree/NodeBuilder.java      |  32 +-
 .../cassandra/utils/btree/NodeCursor.java       | 198 +++++
 .../org/apache/cassandra/utils/btree/Path.java  | 354 --------
 .../cassandra/utils/btree/TreeBuilder.java      | 119 +++
 .../cassandra/utils/btree/TreeCursor.java       | 272 ++++++
 .../cassandra/utils/btree/UpdateFunction.java   |  26 +
 .../apache/cassandra/utils/LongBTreeTest.java   | 854 +++++++++++++++++++
 .../cassandra/db/BatchlogManagerTest.java       |   1 +
 .../org/apache/cassandra/utils/BTreeTest.java   |  22 +-
 33 files changed, 2898 insertions(+), 936 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
 
b/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
index 91359e6..82ac78e 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
@@ -104,7 +104,7 @@ abstract class ForwardingPrimaryKeyRestrictions implements 
PrimaryKeyRestriction
     }
 
     @Override
-    public SortedSet<Slice.Bound> boundsAsClustering(Bound bound, QueryOptions 
options) throws InvalidRequestException
+    public NavigableSet<Slice.Bound> boundsAsClustering(Bound bound, 
QueryOptions options) throws InvalidRequestException
     {
         return getDelegate().boundsAsClustering(bound, options);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java 
b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java
index 39322ff..20bf70b 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
@@ -200,7 +200,7 @@ final class PrimaryKeyRestrictionSet extends 
AbstractPrimaryKeyRestrictions
     }
 
     @Override
-    public SortedSet<Slice.Bound> boundsAsClustering(Bound bound, QueryOptions 
options) throws InvalidRequestException
+    public NavigableSet<Slice.Bound> boundsAsClustering(Bound bound, 
QueryOptions options) throws InvalidRequestException
     {
         MultiCBuilder builder = MultiCBuilder.create(comparator);
         int keyPosition = 0;
@@ -231,7 +231,7 @@ final class PrimaryKeyRestrictionSet extends 
AbstractPrimaryKeyRestrictions
             r.appendBoundTo(builder, b, options);
 
             if (builder.hasMissingElements())
-                return FBUtilities.<Slice.Bound>emptySortedSet(comparator);
+                return BTreeSet.empty(comparator);
 
             keyPosition = r.getLastColumn().position() + 1;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
index 6a14182..2f9cd7b 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.cql3.restrictions;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.SortedSet;
 
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.Bound;
@@ -43,5 +42,5 @@ interface PrimaryKeyRestrictions extends Restriction, 
Restrictions
 
     public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws 
InvalidRequestException;
 
-    public SortedSet<Slice.Bound> boundsAsClustering(Bound bound, QueryOptions 
options) throws InvalidRequestException;
+    public NavigableSet<Slice.Bound> boundsAsClustering(Bound bound, 
QueryOptions options) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 7660c3e..d9fd5e4 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -484,7 +484,7 @@ public final class StatementRestrictions
      * @return the bounds (start or end) of the clustering columns
      * @throws InvalidRequestException if the request is not valid
      */
-    public SortedSet<Slice.Bound> getClusteringColumnsBounds(Bound b, 
QueryOptions options) throws InvalidRequestException
+    public NavigableSet<Slice.Bound> getClusteringColumnsBounds(Bound b, 
QueryOptions options) throws InvalidRequestException
     {
         return clusteringColumnsRestrictions.boundsAsClustering(b, options);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index 2ab54ce..bf3f2f6 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -116,7 +116,7 @@ final class TokenFilter extends 
ForwardingPrimaryKeyRestrictions
     }
 
     @Override
-    public SortedSet<Slice.Bound> boundsAsClustering(Bound bound, QueryOptions 
options) throws InvalidRequestException
+    public NavigableSet<Slice.Bound> boundsAsClustering(Bound bound, 
QueryOptions options) throws InvalidRequestException
     {
         return tokenRestriction.boundsAsClustering(bound, options);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index bd0ef3a..0a7721a 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -105,7 +105,7 @@ public abstract class TokenRestriction extends 
AbstractPrimaryKeyRestrictions
     }
 
     @Override
-    public SortedSet<Slice.Bound> boundsAsClustering(Bound bound, QueryOptions 
options) throws InvalidRequestException
+    public NavigableSet<Slice.Bound> boundsAsClustering(Bound bound, 
QueryOptions options) throws InvalidRequestException
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 6a6d186..698371c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
@@ -508,7 +509,7 @@ public abstract class ModificationStatement implements 
CQLStatement
 
         PartitionColumns toRead = builder.build();
 
-        NavigableSet<Clustering> clusterings = 
FBUtilities.singleton(cbuilder.build(), cfm.comparator);
+        NavigableSet<Clustering> clusterings = BTreeSet.of(cfm.comparator, 
cbuilder.build());
         List<SinglePartitionReadCommand<?>> commands = new 
ArrayList<>(partitionKeys.size());
         int nowInSec = FBUtilities.nowInSeconds();
         for (ByteBuffer key : partitionKeys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index 978417f..d1830a0 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.btree.BTreeSet;
 
 public class CounterMutation implements IMutation
 {
@@ -234,7 +235,7 @@ public class CounterMutation implements IMutation
     private void 
updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks, 
ColumnFamilyStore cfs)
     {
         ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
-        NavigableSet<Clustering> names = new 
TreeSet<>(cfs.metadata.comparator);
+        BTreeSet.Builder<Clustering> names = 
BTreeSet.builder(cfs.metadata.comparator);
         for (PartitionUpdate.CounterMark mark : marks)
         {
             names.add(mark.clustering().takeAlias());
@@ -245,7 +246,7 @@ public class CounterMutation implements IMutation
         }
 
         int nowInSec = FBUtilities.nowInSeconds();
-        ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(names, false);
+        ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(names.build(), false);
         SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key(), 
builder.build(), filter);
         PeekingIterator<PartitionUpdate.CounterMark> markIter = 
Iterators.peekingIterator(marks.iterator());
         try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator 
partition = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), 
nowInSec))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java 
b/src/java/org/apache/cassandra/db/MultiCBuilder.java
index 36a03ba..ab1c94d 100644
--- a/src/java/org/apache/cassandra/db/MultiCBuilder.java
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -20,9 +20,8 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import static java.util.Collections.singletonList;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTreeSet;
 
 /**
  * Builder that allow to build multiple Clustering/Slice.Bound at the same 
time.
@@ -114,12 +113,12 @@ public abstract class MultiCBuilder
 
             public NavigableSet<Clustering> build()
             {
-                return FBUtilities.singleton(builder.build(), 
builder.comparator());
+                return BTreeSet.of(builder.comparator(), builder.build());
             }
 
-            public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean 
isInclusive)
+            public NavigableSet<Slice.Bound> buildBound(boolean isStart, 
boolean isInclusive)
             {
-                return FBUtilities.singleton(builder.buildBound(isStart, 
isInclusive), builder.comparator());
+                return BTreeSet.of(builder.comparator(), 
builder.buildBound(isStart, isInclusive));
             }
         };
     }
@@ -199,7 +198,7 @@ public abstract class MultiCBuilder
      *
      * @return the clusterings
      */
-    public abstract SortedSet<Slice.Bound> buildBound(boolean isStart, boolean 
isInclusive);
+    public abstract NavigableSet<Slice.Bound> buildBound(boolean isStart, 
boolean isInclusive);
 
     /**
      * Checks if some elements can still be added to the clusterings.
@@ -386,45 +385,43 @@ public abstract class MultiCBuilder
             built = true;
 
             if (hasMissingElements)
-                return FBUtilities.emptySortedSet(comparator);
+                return BTreeSet.empty(comparator);
 
             CBuilder builder = CBuilder.create(comparator);
 
             if (elementsList.isEmpty())
-                return FBUtilities.singleton(builder.build(), 
builder.comparator());
-
-            // Use a TreeSet to sort and eliminate duplicates
-            NavigableSet<Clustering> set = new TreeSet<>(builder.comparator());
+                return BTreeSet.of(builder.comparator(), builder.build());
 
+            BTreeSet.Builder<Clustering> set = 
BTreeSet.builder(builder.comparator());
             for (int i = 0, m = elementsList.size(); i < m; i++)
             {
                 List<ByteBuffer> elements = elementsList.get(i);
                 set.add(builder.buildWith(elements));
             }
-            return set;
+            return set.build();
         }
 
-        public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean 
isInclusive)
+        public NavigableSet<Slice.Bound> buildBound(boolean isStart, boolean 
isInclusive)
         {
             built = true;
 
             if (hasMissingElements)
-                return FBUtilities.emptySortedSet(comparator);
+                return BTreeSet.empty(comparator);
 
             CBuilder builder = CBuilder.create(comparator);
 
             if (elementsList.isEmpty())
-                return FBUtilities.singleton(builder.buildBound(isStart, 
isInclusive), comparator);
+                return BTreeSet.of(comparator, builder.buildBound(isStart, 
isInclusive));
 
             // Use a TreeSet to sort and eliminate duplicates
-            SortedSet<Slice.Bound> set = new TreeSet<>(comparator);
+            BTreeSet.Builder<Slice.Bound> set = BTreeSet.builder(comparator);
 
             for (int i = 0, m = elementsList.size(); i < m; i++)
             {
                 List<ByteBuffer> elements = elementsList.get(i);
                 set.add(builder.buildBoundWith(elements, isStart, 
isInclusive));
             }
-            return set;
+            return set.build();
         }
 
         private void checkUpdateable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java 
b/src/java/org/apache/cassandra/db/PartitionColumns.java
index a1b1d00..ef05760 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -23,6 +23,9 @@ import java.security.MessageDigest;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+import static java.util.Comparator.naturalOrder;
 
 /**
  * Columns (or a subset of the columns) that a partition contains.
@@ -124,33 +127,27 @@ public class PartitionColumns implements 
Iterable<ColumnDefinition>
         // Note that we do want to use sorted sets because we want the column 
definitions to be compared
         // through compareTo, not equals. The former basically check it's the 
same column name, while the latter
         // check it's the same object, including the same type.
-        private SortedSet<ColumnDefinition> regularColumns;
-        private SortedSet<ColumnDefinition> staticColumns;
+        private BTreeSet.Builder<ColumnDefinition> regularColumns;
+        private BTreeSet.Builder<ColumnDefinition> staticColumns;
 
         public Builder add(ColumnDefinition c)
         {
             if (c.isStatic())
             {
                 if (staticColumns == null)
-                    staticColumns = new TreeSet<>();
+                    staticColumns = BTreeSet.builder(naturalOrder());
                 staticColumns.add(c);
             }
             else
             {
                 assert c.isRegular();
                 if (regularColumns == null)
-                    regularColumns = new TreeSet<>();
+                    regularColumns = BTreeSet.builder(naturalOrder());
                 regularColumns.add(c);
             }
             return this;
         }
 
-        public int added()
-        {
-            return (regularColumns == null ? 0 : regularColumns.size())
-                 + (staticColumns == null ? 0 : staticColumns.size());
-        }
-
         public Builder addAll(Iterable<ColumnDefinition> columns)
         {
             for (ColumnDefinition c : columns)
@@ -161,13 +158,13 @@ public class PartitionColumns implements 
Iterable<ColumnDefinition>
         public Builder addAll(PartitionColumns columns)
         {
             if (regularColumns == null && !columns.regulars.isEmpty())
-                regularColumns = new TreeSet<>();
+                regularColumns = BTreeSet.builder(naturalOrder());
 
             for (ColumnDefinition c : columns.regulars)
                 regularColumns.add(c);
 
             if (staticColumns == null && !columns.statics.isEmpty())
-                staticColumns = new TreeSet<>();
+                staticColumns = BTreeSet.builder(naturalOrder());
 
             for (ColumnDefinition c : columns.statics)
                 staticColumns.add(c);
@@ -177,8 +174,8 @@ public class PartitionColumns implements 
Iterable<ColumnDefinition>
 
         public PartitionColumns build()
         {
-            return new PartitionColumns(staticColumns == null ? Columns.NONE : 
Columns.from(staticColumns),
-                                        regularColumns == null ? Columns.NONE 
: Columns.from(regularColumns));
+            return new PartitionColumns(staticColumns == null ? Columns.NONE : 
Columns.from(staticColumns.build()),
+                                        regularColumns == null ? Columns.NONE 
: Columns.from(regularColumns.build()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index d359b2b..c0f8207 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -33,6 +33,7 @@ import 
org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 import org.apache.cassandra.thrift.ThriftResultsMerger;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 
 /**
@@ -217,13 +218,13 @@ public class SinglePartitionNamesCommand extends 
SinglePartitionReadCommand<Clus
         if (hasNoMoreStatic && hasNoMoreClusterings)
             return null;
 
-        NavigableSet<Clustering> newClusterings = clusterings;
         if (toRemove != null)
         {
-            newClusterings = new TreeSet<>(result.metadata().comparator);
+            BTreeSet.Builder<Clustering> newClusterings = 
BTreeSet.builder(result.metadata().comparator);
             newClusterings.addAll(Sets.difference(clusterings, toRemove));
+            clusterings = newClusterings.build();
         }
-        return new ClusteringIndexNamesFilter(newClusterings, 
filter.isReversed());
+        return new ClusteringIndexNamesFilter(clusterings, 
filter.isReversed());
     }
 
     private boolean canRemoveRow(Row row, Columns requestedColumns, long 
sstableTimestamp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java 
b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index 5e6c87b..f2cc46f 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
 
 /**
  * A filter selecting rows given their clustering value.
@@ -260,12 +261,12 @@ public class ClusteringIndexNamesFilter extends 
AbstractClusteringIndexFilter
         public ClusteringIndexFilter deserialize(DataInput in, int version, 
CFMetaData metadata, boolean reversed) throws IOException
         {
             ClusteringComparator comparator = metadata.comparator;
-            NavigableSet<Clustering> clusterings = new TreeSet<>(comparator);
+            BTreeSet.Builder<Clustering> clusterings = 
BTreeSet.builder(comparator);
             int size = in.readInt();
             for (int i = 0; i < size; i++)
                 clusterings.add(Clustering.serializer.deserialize(in, version, 
comparator.subtypes()).takeAlias());
 
-            return new ClusteringIndexNamesFilter(clusterings, reversed);
+            return new ClusteringIndexNamesFilter(clusterings.build(), 
reversed);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 4f63ae8..1d978a2 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -32,6 +32,7 @@ import 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.FBUtilities;
 
 public abstract class SecondaryIndexSearcher
@@ -109,10 +110,10 @@ public abstract class SecondaryIndexSearcher
             if (filter instanceof ClusteringIndexNamesFilter)
             {
                 NavigableSet<Clustering> requested = 
((ClusteringIndexNamesFilter)filter).requestedRows();
-                NavigableSet<Clustering> clusterings = new 
TreeSet<>(index.getIndexComparator());
+                BTreeSet.Builder<Clustering> clusterings = 
BTreeSet.builder(index.getIndexComparator());
                 for (Clustering c : requested)
                     clusterings.add(index.makeIndexClustering(pk, c, 
(Cell)null).takeAlias());
-                return new ClusteringIndexNamesFilter(clusterings, 
filter.isReversed());
+                return new ClusteringIndexNamesFilter(clusterings.build(), 
filter.isReversed());
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java 
b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index f838ff1..029dd3c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.*;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 
@@ -102,7 +103,7 @@ public class CompositesSearcher extends 
SecondaryIndexSearcher
                 // in memory so we should consider adding some paging 
mechanism. However, index hits should
                 // be relatively small so it's much better than the previous 
code that was materializing all
                 // *data* for a given partition.
-                NavigableSet<Clustering> clusterings = new 
TreeSet<>(baseCfs.getComparator());
+                BTreeSet.Builder<Clustering> clusterings = 
BTreeSet.builder(baseCfs.getComparator());
                 List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
                 DecoratedKey partitionKey = 
baseCfs.partitioner.decorateKey(nextEntry.indexedKey);
 
@@ -123,7 +124,7 @@ public class CompositesSearcher extends 
SecondaryIndexSearcher
                     return prepareNext();
 
                 // Query the gathered index hits. We still need to filter 
stale hits from the resulting query.
-                ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(clusterings, false);
+                ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(clusterings.build(), false);
                 SinglePartitionReadCommand dataCmd = new 
SinglePartitionNamesCommand(baseCfs.metadata,
                                                                                
      command.nowInSec(),
                                                                                
      command.columnFilter(),
@@ -197,7 +198,7 @@ public class CompositesSearcher extends 
SecondaryIndexSearcher
                 if (next != null)
                     return true;
 
-                while (next == null && super.hasNext())
+                while (super.hasNext())
                 {
                     next = super.next();
                     if (next.kind() != Unfiltered.Kind.ROW)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 777e7b4..6a888a6 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -577,7 +577,7 @@ public class AtomicBTreePartition implements Partition
                 MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW
                                           ? current.staticRow
                                           : (current.staticRow == null ? 
updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
-                Object[] tree = BTree.<Clusterable, Row, 
MemtableRowData>update(current.tree, update.metadata().comparator, update, 
update.rowCount(), updater);
+                Object[] tree = BTree.update(current.tree, 
update.metadata().comparator, update, update.rowCount(), updater);
                 RowStats newStats = current.stats.mergeWith(update.stats());
 
                 if (tree != null && refUpdater.compareAndSet(this, current, 
new Holder(tree, deletionInfo, staticRow, newStats)))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 670b1ae..09e4c03 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -470,6 +470,12 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
         sort();
     }
 
+    public int rowCount()
+    {
+        maybeSort();
+        return super.rowCount();
+    }
+
     private synchronized void sort()
     {
         if (isSorted)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 21049d4..1dc1527 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.thrift.TException;
 
 public class CassandraServer implements Cassandra.Iface
@@ -2486,13 +2487,13 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             // Gather the clustering for the expected values and query those.
-            NavigableSet<Clustering> clusterings = new 
TreeSet<>(metadata.comparator);
+            BTreeSet.Builder<Clustering> clusterings = 
BTreeSet.builder(metadata.comparator);
             for (Row row : expected)
                 clusterings.add(row.clustering().takeAlias());
             PartitionColumns columns = expected.staticRow().isEmpty()
                                      ? 
metadata.partitionColumns().withoutStatics()
                                      : metadata.partitionColumns();
-            ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(clusterings, false);
+            ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(clusterings.build(), false);
             return SinglePartitionReadCommand.create(true, metadata, nowInSec, 
ColumnFilter.selection(columns), RowFilter.NONE, DataLimits.NONE, key, filter);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/utils/IndexedSearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IndexedSearchIterator.java 
b/src/java/org/apache/cassandra/utils/IndexedSearchIterator.java
new file mode 100644
index 0000000..a156629
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/IndexedSearchIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+public interface IndexedSearchIterator<K, V> extends SearchIterator<K, V>
+{
+    /**
+     * @return the value just recently returned by next()
+     * @throws java.util.NoSuchElementException if next() returned null
+     */
+    public V current();
+
+    /**
+     * @return the index of the value returned by current(), and just returned 
by next()
+     * @throws java.util.NoSuchElementException if next() returned null
+     */
+    public int indexOfCurrent();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/utils/SearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SearchIterator.java 
b/src/java/org/apache/cassandra/utils/SearchIterator.java
index 004b02a..ca7b2fa 100644
--- a/src/java/org/apache/cassandra/utils/SearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/SearchIterator.java
@@ -19,8 +19,16 @@ package org.apache.cassandra.utils;
 
 public interface SearchIterator<K, V>
 {
-
     public boolean hasNext();
-    public V next(K key);
 
+    /**
+     * Searches "forwards" (in direction of travel) in the iterator for the 
required key;
+     * if this or any key greater has already been returned by the iterator, 
null will be returned.
+     *
+     * it is permitted to search past the end of the iterator, i.e. !hasNext() 
=> next(?) == null
+     *
+     * @param key to search for
+     * @return value associated with key, if present in direction of travel
+     */
+    public V next(K key);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java 
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index bf68ffa..6e15638 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -18,26 +18,27 @@
  */
 package org.apache.cassandra.utils.btree;
 
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Queue;
+import java.util.*;
+
+import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.utils.ObjectSizes;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
 public class BTree
 {
     /**
      * Leaf Nodes are a raw array of values: Object[V1, V1, ...,].
      *
-     * Branch Nodes: Object[V1, V2, ..., child[&lt;V1.key], child[&lt;V2.key], 
..., child[&lt; Inf]], where
+     * Branch Nodes: Object[V1, V2, ..., child[&lt;V1.key], child[&lt;V2.key], 
..., child[&lt; Inf], size], where
      * each child is another node, i.e., an Object[].  Thus, the value 
elements in a branch node are the
-     * first half of the array, rounding down.  In our implementation, each 
value must include its own key;
+     * first half of the array (minus one).  In our implementation, each value 
must include its own key;
      * we access these via Comparator, rather than directly. 
      *
-     * So we can quickly distinguish between leaves and branches, we require 
that leaf nodes are always even number
-     * of elements (padded with a null, if necessary), and branches are always 
an odd number of elements.
+     * So we can quickly distinguish between leaves and branches, we require 
that leaf nodes are always an odd number
+     * of elements (padded with a null, if necessary), and branches are always 
an even number of elements.
      *
      * BTrees are immutable; updating one returns a new tree that reuses 
unmodified nodes.
      *
@@ -62,10 +63,10 @@ public class BTree
     static final int FAN_FACTOR = 1 << FAN_SHIFT;
 
     // An empty BTree Leaf - which is the same as an empty BTree
-    static final Object[] EMPTY_LEAF = new Object[0];
+    static final Object[] EMPTY_LEAF = new Object[1];
 
     // An empty BTree branch - used only for internal purposes in Modifier
-    static final Object[] EMPTY_BRANCH = new Object[1];
+    static final Object[] EMPTY_BRANCH = new Object[] { null, new int[0] };
 
     /**
      * Returns an empty BTree
@@ -77,6 +78,11 @@ public class BTree
         return EMPTY_LEAF;
     }
 
+    public static Object[] singleton(Object value)
+    {
+        return new Object[] { value };
+    }
+
     public static <C, K extends C, V extends C> Object[] build(Collection<K> 
source, UpdateFunction<K, V> updateF)
     {
         return build(source, source.size(), updateF);
@@ -85,15 +91,17 @@ public class BTree
     /**
      * Creates a BTree containing all of the objects in the provided collection
      *
-     * @param source     the items to build the tree with
-     * @return
+     * @param source  the items to build the tree with. MUST BE IN STRICTLY 
ASCENDING ORDER.
+     * @return        a btree representing the contents of the provided 
iterable
      */
     public static <C, K extends C, V extends C> Object[] build(Iterable<K> 
source, int size, UpdateFunction<K, V> updateF)
     {
         if (size < FAN_FACTOR)
         {
-            // pad to even length to match contract that all leaf nodes are 
even
-            V[] values = (V[]) new Object[size + (size & 1)];
+            if (size == 0)
+                return EMPTY_LEAF;
+            // pad to odd length to match contract that all leaf nodes are odd
+            V[] values = (V[]) new Object[size | 1];
             {
                 int i = 0;
                 for (K k : source)
@@ -103,10 +111,10 @@ public class BTree
             return values;
         }
 
-        Queue<Builder> queue = modifier.get();
-        Builder builder = queue.poll();
+        Queue<TreeBuilder> queue = modifier.get();
+        TreeBuilder builder = queue.poll();
         if (builder == null)
-            builder = new Builder();
+            builder = new TreeBuilder();
         Object[] btree = builder.build(source, updateF, size);
         queue.add(builder);
         return btree;
@@ -121,11 +129,11 @@ public class BTree
     }
 
     /**
-     * Returns a new BTree with the provided set inserting/replacing as 
necessary any equal items
+     * Returns a new BTree with the provided collection inserting/replacing as 
necessary any equal items
      *
      * @param btree              the tree to update
      * @param comparator         the comparator that defines the ordering over 
the items in the tree
-     * @param updateWith         the items to either insert / update
+     * @param updateWith         the items to either insert / update. MUST BE 
IN STRICTLY ASCENDING ORDER.
      * @param updateWithLength   then number of elements in updateWith
      * @param updateF            the update function to apply to any pairs we 
are swapping, and maybe abort early
      * @param <V>
@@ -137,18 +145,29 @@ public class BTree
                                                                 int 
updateWithLength,
                                                                 
UpdateFunction<K, V> updateF)
     {
-        if (btree.length == 0)
+        if (isEmpty(btree))
             return build(updateWith, updateWithLength, updateF);
 
-        Queue<Builder> queue = modifier.get();
-        Builder builder = queue.poll();
+        Queue<TreeBuilder> queue = modifier.get();
+        TreeBuilder builder = queue.poll();
         if (builder == null)
-            builder = new Builder();
+            builder = new TreeBuilder();
         btree = builder.update(btree, comparator, updateWith, updateF);
         queue.add(builder);
         return btree;
     }
 
+    public static <K> Object[] merge(Object[] tree1, Object[] tree2, 
Comparator<K> comparator)
+    {
+        if (size(tree1) < size(tree2))
+        {
+            Object[] tmp = tree1;
+            tree1 = tree2;
+            tree2 = tmp;
+        }
+        return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), 
UpdateFunction.<K>noOp());
+    }
+
     /**
      * Returns an Iterator over the entire tree
      *
@@ -157,96 +176,180 @@ public class BTree
      * @param <V>
      * @return
      */
-    public static <V> Cursor<V, V> slice(Object[] btree, boolean forwards)
+    public static <V> BTreeSearchIterator<V, V> slice(Object[] btree, 
Comparator<? super V> comparator, boolean forwards)
     {
-        Cursor<V, V> r = new Cursor<>();
-        r.reset(btree, forwards);
-        return r;
+        return new BTreeSearchIterator<>(btree, comparator, forwards);
     }
 
     /**
-     * Returns an Iterator over a sub-range of the tree
-     *
      * @param btree      the tree to iterate over
      * @param comparator the comparator that defines the ordering over the 
items in the tree
-     * @param start      the first item to include
-     * @param end        the last item to include
-     * @param forwards   if false, the iterator will start at end and move 
backwards
-     * @param <V>
-     * @return
+     * @param start      the beginning of the range to return, inclusive (in 
ascending order)
+     * @param end        the end of the range to return, exclusive (in 
ascending order)
+     * @param forwards   if false, the iterator will start at the last item 
and move backwards
+     * @return           an Iterator over the defined sub-range of the tree
      */
-    public static <K, V extends K> Cursor<K, V> slice(Object[] btree, 
Comparator<K> comparator, K start, K end, boolean forwards)
+    public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] 
btree, Comparator<? super K> comparator, K start, K end, boolean forwards)
     {
-        Cursor<K, V> r = new Cursor<>();
-        r.reset(btree, comparator, start, end, forwards);
-        return r;
+        return slice(btree, comparator, start, true, end, false, forwards);
     }
 
     /**
-     * Returns an Iterator over a sub-range of the tree
-     *
-     * @param btree      the tree to iterate over
-     * @param comparator the comparator that defines the ordering over the 
items in the tree
-     * @param start      the first item to include
-     * @param end        the last item to include
-     * @param forwards   if false, the iterator will start at end and move 
backwards
-     * @param <V>
-     * @return
+     * @param btree          the tree to iterate over
+     * @param comparator     the comparator that defines the ordering over the 
items in the tree
+     * @param start          low bound of the range
+     * @param startInclusive inclusivity of lower bound
+     * @param end            high bound of the range
+     * @param endInclusive   inclusivity of higher bound
+     * @param forwards       if false, the iterator will start at end and move 
backwards
+     * @return           an Iterator over the defined sub-range of the tree
      */
-    public static <K, V extends K> Cursor<K, V> slice(Object[] btree, 
Comparator<K> comparator, K start, boolean startInclusive, K end, boolean 
endInclusive, boolean forwards)
+    public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] 
btree, Comparator<? super K> comparator, K start, boolean startInclusive, K 
end, boolean endInclusive, boolean forwards)
     {
-        Cursor<K, V> r = new Cursor<>();
-        r.reset(btree, comparator, start, startInclusive, end, endInclusive, 
forwards);
-        return r;
+        int inclusiveLowerBound = max(0,
+                                      start == null ? Integer.MIN_VALUE
+                                                    : startInclusive ? 
ceilIndex(btree, comparator, start)
+                                                                     : 
higherIndex(btree, comparator, start));
+        int inclusiveUpperBound = min(size(btree) - 1,
+                                      end == null ? Integer.MAX_VALUE
+                                                  : endInclusive ? 
floorIndex(btree, comparator, end)
+                                                                 : 
lowerIndex(btree, comparator, end));
+        return new BTreeSearchIterator<>(btree, comparator, forwards, 
inclusiveLowerBound, inclusiveUpperBound);
     }
 
-    public static <V> V find(Object[] node, Comparator<V> comparator, V find)
+    /**
+     * Honours result semantics of {@link Arrays#binarySearch}, as though it 
were performed on the tree flattened into an array
+     * @return index of item in tree, or <tt>(-(<i>insertion point</i>) - 
1)</tt> if not present
+     */
+    public static <V> int findIndex(Object[] node, Comparator<V> comparator, V 
find)
     {
+        int lb = 0;
         while (true)
         {
             int keyEnd = getKeyEnd(node);
-            int i = BTree.find(comparator, find, node, 0, keyEnd);
-            if (i >= 0)
+            int i = Arrays.binarySearch((V[]) node, 0, keyEnd, find, 
comparator);
+            boolean exact = i >= 0;
+
+            if (isLeaf(node))
+                return exact ? lb + i : i - lb;
+
+            if (!exact)
+                i = -1 - i;
+
+            int[] sizeMap = getSizeMap(node);
+            if (exact)
+                return lb + sizeMap[i];
+            else if (i > 0)
+                lb += sizeMap[i - 1] + 1;
+
+            node = (Object[]) node[keyEnd + i];
+        }
+    }
+
+    /**
+     * @return the value at the index'th position in the tree, in tree order
+     */
+    public static <V> V findByIndex(Object[] tree, int index)
+    {
+        // WARNING: if semantics change, see also InternalCursor.seekTo, which 
mirrors this implementation
+        if ((index < 0) | (index >= size(tree)))
+            throw new IndexOutOfBoundsException(index + " not in range [0.." + 
size(tree) + ")");
+
+        Object[] node = tree;
+        while (true)
+        {
+            if (isLeaf(node))
             {
-                return (V) node[i];
+                int keyEnd = getLeafKeyEnd(node);
+                assert index < keyEnd;
+                return (V) node[index];
             }
-            else if (!isLeaf(node))
+
+            int[] sizeMap = getSizeMap(node);
+            int boundary = Arrays.binarySearch(sizeMap, index);
+            if (boundary >= 0)
             {
-                i = -i - 1;
-                node = (Object[]) node[keyEnd + i];
+                // exact match, in this branch node
+                assert boundary < sizeMap.length - 1;
+                return (V) node[boundary];
             }
-            else
+
+            boundary = -1 -boundary;
+            if (boundary > 0)
             {
-                return null;
+                assert boundary < sizeMap.length;
+                index -= (1 + sizeMap[boundary - 1]);
             }
+            node = (Object[]) node[getChildStart(node) + boundary];
         }
     }
 
+    /* since we have access to binarySearch semantics within indexOf(), we can 
use this to implement
+     * lower/upper/floor/higher very trivially
+     *
+     * this implementation is *not* optimal; it requires two logarithmic 
traversals, although the second is much cheaper
+     * (having less height, and operating over only primitive arrays), and the 
clarity is compelling
+     */
 
-    // UTILITY METHODS
 
-    // same basic semantics as Arrays.binarySearch, but delegates to compare() 
method to avoid
-    // wrapping generic Comparator with support for Special +/- infinity 
sentinels
-    static <V> int find(Comparator<V> comparator, Object key, Object[] a, 
final int fromIndex, final int toIndex)
+    public static <V> int lowerIndex(Object[] btree, Comparator<? super V> 
comparator, V find)
     {
-        int low = fromIndex;
-        int high = toIndex - 1;
+        int i = findIndex(btree, comparator, find);
+        if (i < 0)
+            i = -1 -i;
+        return i - 1;
+    }
 
-        while (low <= high)
-        {
-            int mid = (low + high) / 2;
-            int cmp = comparator.compare((V) key, (V) a[mid]);
+    public static <V> V lower(Object[] btree, Comparator<? super V> 
comparator, V find)
+    {
+        int i = lowerIndex(btree, comparator, find);
+        return i >= 0 ? findByIndex(btree, i) : null;
+    }
 
-            if (cmp > 0)
-                low = mid + 1;
-            else if (cmp < 0)
-                high = mid - 1;
-            else
-                return mid; // key found
-        }
-        return -(low + 1);  // key not found.
+    public static <V> int floorIndex(Object[] btree, Comparator<? super V> 
comparator, V find)
+    {
+        int i = findIndex(btree, comparator, find);
+        if (i < 0)
+            i = -2 -i;
+        return i;
     }
 
+    public static <V> V floor(Object[] btree, Comparator<? super V> 
comparator, V find)
+    {
+        int i = floorIndex(btree, comparator, find);
+        return i >= 0 ? findByIndex(btree, i) : null;
+    }
+
+    public static <V> int higherIndex(Object[] btree, Comparator<? super V> 
comparator, V find)
+    {
+        int i = findIndex(btree, comparator, find);
+        if (i < 0) i = -1 -i;
+        else i++;
+        return i;
+    }
+
+    public static <V> V higher(Object[] btree, Comparator<? super V> 
comparator, V find)
+    {
+        int i = higherIndex(btree, comparator, find);
+        return i < size(btree) ? findByIndex(btree, i) : null;
+    }
+
+    public static <V> int ceilIndex(Object[] btree, Comparator<? super V> 
comparator, V find)
+    {
+        int i = findIndex(btree, comparator, find);
+        if (i < 0)
+            i = -1 -i;
+        return i;
+    }
+
+    public static <V> V ceil(Object[] btree, Comparator<? super V> comparator, 
V find)
+    {
+        int i = ceilIndex(btree, comparator, find);
+        return i < size(btree) ? findByIndex(btree, i) : null;
+    }
+
+    // UTILITY METHODS
+
     // get the upper bound we should search in for keys in the node
     static int getKeyEnd(Object[] node)
     {
@@ -260,29 +363,77 @@ public class BTree
     static int getLeafKeyEnd(Object[] node)
     {
         int len = node.length;
-        if (len == 0)
-            return 0;
-        else if (node[len - 1] == null)
-            return len - 1;
-        else
-            return len;
+        return node[len - 1] == null ? len - 1 : len;
     }
 
     // return the boundary position between keys/children for the branch node
-    static int getBranchKeyEnd(Object[] node)
+    // == number of keys, as they are indexed from zero
+    static int getBranchKeyEnd(Object[] branchNode)
+    {
+        return (branchNode.length / 2) - 1;
+    }
+
+    /**
+     * @return first index in a branch node containing child nodes
+     */
+    static int getChildStart(Object[] branchNode)
+    {
+        return getBranchKeyEnd(branchNode);
+    }
+
+    /**
+     * @return last index + 1 in a branch node containing child nodes
+     */
+    static int getChildEnd(Object[] branchNode)
+    {
+        return branchNode.length - 1;
+    }
+
+    /**
+     * @return number of children in a branch node
+     */
+    static int getChildCount(Object[] branchNode)
+    {
+        return branchNode.length / 2;
+    }
+
+    /**
+     * @return the size map for the branch node
+     */
+    static int[] getSizeMap(Object[] branchNode)
+    {
+        return (int[]) branchNode[getChildEnd(branchNode)];
+    }
+
+    /**
+     * @return the size map for the branch node
+     */
+    static int lookupSizeMap(Object[] branchNode, int index)
     {
-        return node.length / 2;
+        return getSizeMap(branchNode)[index];
+    }
+
+    // get the size from the btree's index (fails if not present)
+    public static int size(Object[] tree)
+    {
+        if (isLeaf(tree))
+            return getLeafKeyEnd(tree);
+        int length = tree.length;
+        // length - 1 == getChildEnd == getPositionOfSizeMap
+        // (length / 2) - 1 == getChildCount - 1 == position of full tree size
+        // hard code this, as will be used often;
+        return ((int[]) tree[length - 1])[(length / 2) - 1];
     }
 
     // returns true if the provided node is a leaf, false if it is a branch
     static boolean isLeaf(Object[] node)
     {
-        return (node.length & 1) == 0;
+        return (node.length & 1) == 1;
     }
 
     public static boolean isEmpty(Object[] tree)
     {
-        return tree.length == 0;
+        return tree == EMPTY_LEAF;
     }
 
     public static int depth(Object[] tree)
@@ -296,43 +447,293 @@ public class BTree
         return depth;
     }
 
-    // Special class for making certain operations easier, so we can define a 
+/- Inf
-    static interface Special extends Comparable<Object> { }
-    static final Special POSITIVE_INFINITY = new Special()
+    /**
+     * Fill the target array with the contents of the provided subtree, in 
ascending order, starting at targetOffset
+     * @param tree source
+     * @param target array
+     * @param targetOffset offset in target array
+     * @return number of items copied (size of tree)
+     */
+    public static int toArray(Object[] tree, Object[] target, int targetOffset)
+    {
+        return toArray(tree, 0, size(tree), target, targetOffset);
+    }
+    public static int toArray(Object[] tree, int treeStart, int treeEnd, 
Object[] target, int targetOffset)
     {
-        public int compareTo(Object o)
+        if (isLeaf(tree))
         {
-            return o == this ? 0 : 1;
+            int count = treeEnd - treeStart;
+            System.arraycopy(tree, treeStart, target, targetOffset, count);
+            return count;
         }
-    };
-    static final Special NEGATIVE_INFINITY = new Special()
-    {
-        public int compareTo(Object o)
+
+        int newTargetOffset = targetOffset;
+        int childCount = getChildCount(tree);
+        int childOffset = getChildStart(tree);
+        for (int i = 0 ; i < childCount ; i++)
         {
-            return o == this ? 0 : -1;
+            int childStart = treeIndexOffsetOfChild(tree, i);
+            int childEnd = treeIndexOfBranchKey(tree, i);
+            if (childStart <= treeEnd && childEnd >= treeStart)
+            {
+                newTargetOffset += toArray((Object[]) tree[childOffset + i], 
max(0, treeStart - childStart), min(childEnd, treeEnd) - childStart,
+                                           target, newTargetOffset);
+                if (treeStart <= childEnd && treeEnd > childEnd) // this check 
will always fail for the non-existent key
+                    target[newTargetOffset++] = tree[i];
+            }
         }
-    };
+        return newTargetOffset - targetOffset;
+    }
 
-    private static final ThreadLocal<Queue<Builder>> modifier = new 
ThreadLocal<Queue<Builder>>()
+    /**
+     * tree index => index of key wrt all items in the tree laid out serially
+     *
+     * This version of the method permits requesting out-of-bounds indexes, -1 
and size
+     * @param root to calculate tree index within
+     * @param keyIndex root-local index of key to calculate tree-index
+     * @return the number of items preceding the key in the whole tree of root
+     */
+    public static int treeIndexOfKey(Object[] root, int keyIndex)
+    {
+        if (isLeaf(root))
+            return keyIndex;
+        int[] sizeMap = getSizeMap(root);
+        if ((keyIndex >= 0) & (keyIndex < sizeMap.length))
+            return sizeMap[keyIndex];
+        // we support asking for -1 or size, so that we can easily use this 
for iterator bounds checking
+        if (keyIndex < 0)
+            return -1;
+        return sizeMap[keyIndex - 1] + 1;
+    }
+
+    /**
+     * @param keyIndex node-local index of the key to calculate index of
+     * @return keyIndex; this method is here only for symmetry and clarity
+     */
+    public static int treeIndexOfLeafKey(int keyIndex)
+    {
+        return keyIndex;
+    }
+
+    /**
+     * @param root to calculate tree-index within
+     * @param keyIndex root-local index of key to calculate tree-index of
+     * @return the number of items preceding the key in the whole tree of root
+     */
+    public static int treeIndexOfBranchKey(Object[] root, int keyIndex)
+    {
+        return lookupSizeMap(root, keyIndex);
+    }
+
+    /**
+     * @param root to calculate tree-index within
+     * @param childIndex root-local index of *child* to calculate tree-index of
+     * @return the number of items preceding the child in the whole tree of 
root
+     */
+    public static int treeIndexOffsetOfChild(Object[] root, int childIndex)
+    {
+        if (childIndex == 0)
+            return 0;
+        return 1 + lookupSizeMap(root, childIndex - 1);
+    }
+
+    private static final ThreadLocal<Queue<TreeBuilder>> modifier = new 
ThreadLocal<Queue<TreeBuilder>>()
     {
         @Override
-        protected Queue<Builder> initialValue()
+        protected Queue<TreeBuilder> initialValue()
         {
             return new ArrayDeque<>();
         }
     };
 
+    public static <V> Builder<V> builder(Comparator<? super V> comparator)
+    {
+        return new Builder<>(comparator);
+    }
+
+    public static class Builder<V>
+    {
+
+        final Comparator<? super V> comparator;
+        Object[] values = new Object[10];
+        int count;
+        boolean detected; // true if we have managed to cheaply ensure sorted 
+ filtered as we have added
+
+        protected Builder(Comparator<? super V> comparator)
+        {
+            this.comparator = comparator;
+        }
+
+        public Builder<V> add(V v)
+        {
+            if (count == values.length)
+                values = Arrays.copyOf(values, count * 2);
+            values[count++] = v;
+
+            if (detected && count > 1)
+            {
+                int c = comparator.compare((V) values[count - 2], (V) 
values[count - 1]);
+                if (c == 0) count--;
+                else if (c > 0) detected = false;
+            }
+
+            return this;
+        }
+
+        public Builder<V> addAll(Collection<V> add)
+        {
+            if (add instanceof SortedSet && equalComparators(comparator, 
((SortedSet) add).comparator()))
+            {
+                // if we're a SortedSet, permit quick order-preserving 
addition of items
+                return mergeAll(add, add.size());
+            }
+            detected = false;
+            if (values.length < count + add.size())
+                values = Arrays.copyOf(values, max(count + add.size(), count * 
2));
+            for (V v : add)
+                values[count++] = v;
+            return this;
+        }
+
+        private static boolean equalComparators(Comparator<?> a, Comparator<?> 
b)
+        {
+            return a == b || (isNaturalComparator(a) && 
isNaturalComparator(b));
+        }
+
+        private static boolean isNaturalComparator(Comparator<?> a)
+        {
+            return a == null || a == Comparator.naturalOrder() || a == 
Ordering.natural();
+        }
+
+        // iter must be in sorted order!
+        public Builder<V> mergeAll(Iterable<V> add, int addCount)
+        {
+            // ensure the existing contents are in order
+            sortAndFilter();
+
+            int curCount = count;
+            // we make room for curCount * 2 + addCount, so that we can copy 
the current values to the end
+            // if necessary for continuing the merge, and have the new values 
directly after the current value range
+            // i.e. []
+            if (values.length < curCount * 2 + addCount)
+                values = Arrays.copyOf(values, max(curCount * 2 + addCount, 
curCount * 3));
+
+            if (add instanceof BTreeSet)
+            {
+                // use btree set's fast toArray method, to append directly
+                ((BTreeSet) add).toArray(values, curCount);
+            }
+            else
+            {
+                // consider calling toArray() and System.arraycopy
+                int i = curCount;
+                for (V v : add)
+                    values[i++] = v;
+            }
+            return mergeAll(addCount);
+        }
+
+        // iter must be in sorted order!
+        private Builder<V> mergeAll(int addCount)
+        {
+            // start optimistically by assuming new values are superset of 
current, and just run until this fails to hold
+            Object[] a = values;
+            int addOffset = count;
+
+            int i = 0, j = addOffset;
+            int curEnd = addOffset, addEnd = addOffset + addCount;
+            while (i < curEnd && j < addEnd)
+            {
+                int c = comparator.compare((V) a[i], (V) a[j]);
+                if (c > 0)
+                    break;
+                else if (c == 0)
+                    j++;
+                i++;
+            }
+
+            if (j == addEnd)
+                return this; // already a superset of the new values
+
+            // otherwise, copy the remaining existing values to the very end, 
freeing up space for merge result
+            int newCount = i;
+            System.arraycopy(a, i, a, addEnd, count - i);
+            curEnd = addEnd + (count - i);
+            i = addEnd;
+
+            while (i < curEnd && j < addEnd)
+            {
+                // could avoid one comparison if we cared, but would make this 
ugly
+                int c = comparator.compare((V) a[i], (V) a[j]);
+                if (c == 0)
+                {
+                    a[newCount++] = a[i];
+                    i++;
+                    j++;
+                }
+                else
+                {
+                    a[newCount++] =  c < 0 ? a[i++] : a[j++];
+                }
+            }
+
+            // exhausted one of the inputs; fill in remainder of the other
+            if (i < curEnd)
+            {
+                System.arraycopy(a, i, a, newCount, curEnd - i);
+                newCount += curEnd - i;
+            }
+            else if (j < addEnd)
+            {
+                if (j != newCount)
+                    System.arraycopy(a, j, a, newCount, addEnd - j);
+                newCount += addEnd - j;
+            }
+            count = newCount;
+            return this;
+        }
+
+        public boolean isEmpty()
+        {
+            return count == 0;
+        }
+
+        private void sortAndFilter()
+        {
+            if (!detected && count > 1)
+            {
+                Arrays.sort((V[]) values, 0, count, comparator);
+                int c = 1;
+                for (int i = 1 ; i < count ; i++)
+                    if (comparator.compare((V) values[i], (V) values[i - 1]) 
!= 0)
+                        values[c++] = values[i];
+                count = c;
+            }
+            detected = true;
+        }
+
+        public Object[] build()
+        {
+            sortAndFilter();
+            return BTree.build(Arrays.asList(values).subList(0, count), 
UpdateFunction.noOp());
+        }
+    }
+
     /** simple static wrapper to calls to cmp.compare() which checks if either 
a or b are Special (i.e. represent an infinity) */
-    // TODO : cheaper to check for POSITIVE/NEGATIVE infinity in callers, 
rather than here
     static <V> int compare(Comparator<V> cmp, Object a, Object b)
     {
-        if (a instanceof Special)
-            return ((Special) a).compareTo(b);
-        if (b instanceof Special)
-            return -((Special) b).compareTo(a);
+        if (a == b)
+            return 0;
+        if (a == NEGATIVE_INFINITY | b == POSITIVE_INFINITY)
+            return -1;
+        if (b == NEGATIVE_INFINITY | a == POSITIVE_INFINITY)
+            return 1;
         return cmp.compare((V) a, (V) b);
     }
 
+    static Object POSITIVE_INFINITY = new Object();
+    static Object NEGATIVE_INFINITY = new Object();
+
     public static boolean isWellFormed(Object[] btree, Comparator<? extends 
Object> cmp)
     {
         return isWellFormed(cmp, btree, true, NEGATIVE_INFINITY, 
POSITIVE_INFINITY);
@@ -346,17 +747,16 @@ public class BTree
         if (isLeaf(node))
         {
             if (isRoot)
-                return node.length <= FAN_FACTOR;
-            return node.length >= FAN_FACTOR / 2 && node.length <= FAN_FACTOR;
+                return node.length <= FAN_FACTOR + 1;
+            return node.length >= FAN_FACTOR / 2 && node.length <= FAN_FACTOR 
+ 1;
         }
 
         int type = 0;
-        int childOffset = getBranchKeyEnd(node);
         // compare each child node with the branch element at the head of this 
node it corresponds with
-        for (int i = childOffset; i < node.length; i++)
+        for (int i = getChildStart(node); i < getChildEnd(node) ; i++)
         {
             Object[] child = (Object[]) node[i];
-            Object localmax = i < node.length - 1 ? node[i - childOffset] : 
max;
+            Object localmax = i < node.length - 2 ? node[i - 
getChildStart(node)] : max;
             if (!isWellFormed(cmp, child, false, min, localmax))
                 return false;
             type |= isLeaf(child) ? 1 : 2;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5250d7ff/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java 
b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
index 403f234..d6e6fae 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
@@ -1,100 +1,164 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
 package org.apache.cassandra.utils.btree;
 
 import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
-import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.IndexedSearchIterator;
 
-import static org.apache.cassandra.utils.btree.BTree.getKeyEnd;
+import static org.apache.cassandra.utils.btree.BTree.size;
 
-public class BTreeSearchIterator<CK, K extends CK, V> extends Path implements 
SearchIterator<K, V>
+public class BTreeSearchIterator<K, V> extends TreeCursor<K> implements 
IndexedSearchIterator<K, V>, Iterator<V>
 {
-    final Comparator<CK> comparator;
-    final boolean forwards;
+    private final boolean forwards;
 
-    public BTreeSearchIterator(Object[] btree, Comparator<CK> comparator, 
boolean forwards)
+    // for simplicity, we just always use the index feature of the btree to 
maintain our bounds within the tree,
+    // whether or not they are constrained
+    private int index;
+    private byte state;
+    private final int lowerBound, upperBound; // inclusive
+
+    private static final int MIDDLE = 0; // only "exists" as an absence of 
other states
+    private static final int ON_ITEM = 1; // may only co-exist with LAST (or 
MIDDLE, which is 0)
+    private static final int BEFORE_FIRST = 2; // may not coexist with any 
other state
+    private static final int LAST = 4; // may co-exist with ON_ITEM, in which 
case we are also at END
+    private static final int END = 5; // equal to LAST | ON_ITEM
+
+    public BTreeSearchIterator(Object[] btree, Comparator<? super K> 
comparator, boolean forwards)
+    {
+        this(btree, comparator, forwards, 0, size(btree)-1);
+    }
+
+    BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, 
boolean forwards, int lowerBound, int upperBound)
     {
-        init(btree);
-        if (!forwards)
-            this.indexes[0] = (byte)(getKeyEnd(path[0]) - 1);
-        this.comparator = comparator;
+        super(comparator, btree);
         this.forwards = forwards;
+        this.lowerBound = lowerBound;
+        this.upperBound = upperBound;
+        rewind();
+    }
+
+    /**
+     * @return 0 if we are on the last item, 1 if we are past the last item, 
and -1 if we are before it
+     */
+    private int compareToLast(int idx)
+    {
+        return forwards ? idx - upperBound : lowerBound - idx;
+    }
+
+    private int compareToFirst(int idx)
+    {
+        return forwards ? idx - lowerBound : upperBound - idx;
+    }
+
+    public boolean hasNext()
+    {
+        return state != END;
+    }
+
+    public V next()
+    {
+        switch (state)
+        {
+            case ON_ITEM:
+                if (compareToLast(index = moveOne(forwards)) >= 0)
+                    state = END;
+                break;
+            case BEFORE_FIRST:
+                seekTo(index = forwards ? lowerBound : upperBound);
+                state = (byte) (upperBound == lowerBound ? LAST : MIDDLE);
+            case LAST:
+            case MIDDLE:
+                state |= ON_ITEM;
+                break;
+            default:
+                throw new NoSuchElementException();
+        }
+
+        return current();
     }
 
     public V next(K target)
     {
-        // We could probably avoid some of the repetition but leaving that for 
later.
-        if (forwards)
+        if (!hasNext())
+            return null;
+
+        int state = this.state;
+        int index = seekTo(target, forwards, (state & (ON_ITEM | 
BEFORE_FIRST)) != 0);
+        boolean found = index >= 0;
+        if (!found) index = -1 -index;
+
+        V next = null;
+        if (state == BEFORE_FIRST && compareToFirst(index) < 0)
+            return null;
+
+        int compareToLast = compareToLast(index);
+        if ((compareToLast <= 0))
         {
-            while (depth > 0)
+            state = compareToLast < 0 ? MIDDLE : LAST;
+            if (found)
             {
-                byte successorParentDepth = findSuccessorParentDepth();
-                if (successorParentDepth < 0)
-                    break; // we're in last section of tree, so can only 
search down
-                int successorParentIndex = indexes[successorParentDepth] + 1;
-                Object[] successParentNode = path[successorParentDepth];
-                Object successorParentKey = 
successParentNode[successorParentIndex];
-                int c = BTree.compare(comparator, target, successorParentKey);
-                if (c < 0)
-                    break;
-                if (c == 0)
-                {
-                    depth = successorParentDepth;
-                    indexes[successorParentDepth]++;
-                    return (V) successorParentKey;
-                }
-                depth = successorParentDepth;
-                indexes[successorParentDepth]++;
+                state |= ON_ITEM;
+                next = (V) currentValue();
             }
-            if (find(comparator, target, Op.CEIL, true))
-                return (V) currentKey();
+        }
+        else state = END;
+
+        this.state = (byte) state;
+        this.index = index;
+        return next;
+    }
+
+    /**
+     * Reset this Iterator to its starting position
+     */
+    public void rewind()
+    {
+        if (upperBound < lowerBound)
+        {
+            state = (byte) END;
         }
         else
         {
-            while (depth > 0)
-            {
-                byte predecessorParentDepth = findPredecessorParentDepth();
-                if (predecessorParentDepth < 0)
-                    break; // we're in last section of tree, so can only 
search down
-                int predecessorParentIndex = indexes[predecessorParentDepth] - 
1;
-                Object[] predecessParentNode = path[predecessorParentDepth];
-                Object predecessorParentKey = 
predecessParentNode[predecessorParentIndex];
-                int c = BTree.compare(comparator, target, 
predecessorParentKey);
-                if (c > 0)
-                    break;
-                if (c == 0)
-                {
-                    depth = predecessorParentDepth;
-                    indexes[predecessorParentDepth]--;
-                    return (V) predecessorParentKey;
-                }
-                depth = predecessorParentDepth;
-                indexes[predecessorParentDepth]--;
-            }
-            if (find(comparator, target, Op.FLOOR, false))
-                return (V) currentKey();
+            // we don't move into the tree until the first request is made, so 
we know where to go
+            reset(forwards);
+            state = (byte) BEFORE_FIRST;
         }
-        return null;
     }
 
-    public boolean hasNext()
+    private void checkOnItem()
+    {
+        if ((state & ON_ITEM) != ON_ITEM)
+            throw new NoSuchElementException();
+    }
+
+    public V current()
+    {
+        checkOnItem();
+        return (V) currentValue();
+    }
+
+    public int indexOfCurrent()
     {
-        return depth != 0 || indexes[0] != (forwards ? getKeyEnd(path[0]) : 
-1);
+        checkOnItem();
+        return compareToFirst(index);
     }
 }

Reply via email to