This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit c10ea993a3c75f9e1bf931ae5d0dd5e7c779e28b
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Tue Sep 17 13:55:22 2024 +0100

    Change MaxConflicts to use a BTree under the hood
    
    patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
    CASSANDRA-19952
---
 .../src/main/java/accord/local/MaxConflicts.java   |  52 +-
 .../main/java/accord/primitives/AbstractKeys.java  |  12 +
 .../java/accord/primitives/AbstractRanges.java     |  12 +
 .../src/main/java/accord/primitives/Routables.java |  10 +
 .../accord/utils/BTreeReducingIntervalMap.java     | 783 +++++++++++++++++++++
 .../java/accord/utils/BTreeReducingRangeMap.java   | 535 ++++++++++++++
 .../main/java/accord/utils/ReducingRangeMap.java   |  36 +-
 .../src/main/java/accord/utils/TinyKVBuffer.java   | 115 +++
 .../src/main/java/accord/utils/TriConsumer.java    |  23 +
 .../accord/utils/BTreeReducingRangeMapTest.java    | 465 ++++++++++++
 10 files changed, 1985 insertions(+), 58 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/MaxConflicts.java 
b/accord-core/src/main/java/accord/local/MaxConflicts.java
index c31a1bfa..421c54f6 100644
--- a/accord-core/src/main/java/accord/local/MaxConflicts.java
+++ b/accord-core/src/main/java/accord/local/MaxConflicts.java
@@ -15,21 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package accord.local;
 
-import javax.annotation.Nonnull;
-
 import accord.api.RoutingKey;
-import accord.primitives.AbstractRanges;
 import accord.primitives.Routables;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
-import accord.utils.ReducingIntervalMap;
-import accord.utils.ReducingRangeMap;
+import accord.utils.BTreeReducingRangeMap;
 
 // TODO (expected): track read/write conflicts separately
-public class MaxConflicts extends ReducingRangeMap<Timestamp>
+class MaxConflicts extends BTreeReducingRangeMap<Timestamp>
 {
     public static final MaxConflicts EMPTY = new MaxConflicts();
 
@@ -38,59 +33,32 @@ public class MaxConflicts extends 
ReducingRangeMap<Timestamp>
         super();
     }
 
-    private MaxConflicts(boolean inclusiveEnds, RoutingKey[] starts, 
Timestamp[] values)
+    private MaxConflicts(boolean inclusiveEnds, Object[] tree)
     {
-        super(inclusiveEnds, starts, values);
+        super(inclusiveEnds, tree);
     }
 
-    public Timestamp get(Seekables<?, ?> keysOrRanges)
+    Timestamp get(Routables<?> keysOrRanges)
     {
         return foldl(keysOrRanges, Timestamp::max, Timestamp.NONE);
     }
 
-    public Timestamp get(Routables<?> keysOrRanges)
-    {
-        return foldl(keysOrRanges, Timestamp::max, Timestamp.NONE);
-    }
-
-    MaxConflicts update(Seekables<?, ?> keysOrRanges, Timestamp maxConflict)
-    {
-        return merge(this, create(keysOrRanges, maxConflict));
-    }
-
-    public static MaxConflicts create(AbstractRanges ranges, @Nonnull 
Timestamp maxConflict)
-    {
-        if (ranges.isEmpty())
-            return MaxConflicts.EMPTY;
-
-        return create(ranges, maxConflict, MaxConflicts.Builder::new);
-    }
-
-    public static MaxConflicts create(Seekables<?, ?> keysOrRanges, @Nonnull 
Timestamp maxConflict)
-    {
-        if (keysOrRanges.isEmpty())
-            return MaxConflicts.EMPTY;
-
-        return create(keysOrRanges, maxConflict, Builder::new);
-    }
-
-    public static MaxConflicts merge(MaxConflicts a, MaxConflicts b)
+    public MaxConflicts update(Seekables<?, ?> keysOrRanges, Timestamp 
maxConflict)
     {
-        return ReducingIntervalMap.merge(a, b, Timestamp::max, 
MaxConflicts.Builder::new);
+        return update(this, keysOrRanges, maxConflict, Timestamp::max, 
MaxConflicts::new, Builder::new);
     }
 
-    static class Builder extends AbstractBoundariesBuilder<RoutingKey, 
Timestamp, MaxConflicts>
+    private static class Builder extends AbstractBoundariesBuilder<RoutingKey, 
Timestamp, MaxConflicts>
     {
         protected Builder(boolean inclusiveEnds, int capacity)
         {
             super(inclusiveEnds, capacity);
         }
 
-        @SuppressWarnings("unchecked")
         @Override
-        protected MaxConflicts buildInternal()
+        protected MaxConflicts buildInternal(Object[] tree)
         {
-            return new MaxConflicts(inclusiveEnds, starts.toArray(new 
RoutingKey[0]), values.toArray(new Timestamp[0]));
+            return new MaxConflicts(inclusiveEnds, tree);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java 
b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index 29b89467..771cefd2 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -117,12 +117,24 @@ public abstract class AbstractKeys<K extends RoutableKey> 
implements Iterable<K>
         return findNextIntersection(0, ranges, 0) >= 0;
     }
 
+    @Override
+    public final int find(RoutableKey key, SortedArrays.Search search)
+    {
+        return SortedArrays.binarySearch(keys, 0, keys.length, key, 
RoutableKey::compareTo, search);
+    }
+
     @Override
     public final int findNext(int thisIndex, RoutableKey key, 
SortedArrays.Search search)
     {
         return SortedArrays.exponentialSearch(keys, thisIndex, keys.length, 
key, RoutableKey::compareTo, search);
     }
 
+    @Override
+    public final int find(Range find, SortedArrays.Search search)
+    {
+        return SortedArrays.binarySearch(keys, 0, size(), find, 
Range::compareTo, search);
+    }
+
     @Override
     public final int findNext(int thisIndex, Range find, SortedArrays.Search 
search)
     {
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java 
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index 50d90061..5c0a7cd7 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -204,12 +204,24 @@ public abstract class AbstractRanges implements 
Iterable<Range>, Routables<Range
         return findNextIntersection(thisIndex, (AbstractRanges) with, 
withIndex);
     }
 
+    @Override
+    public final int find(Range find, SortedArrays.Search search)
+    {
+        return SortedArrays.binarySearch(ranges, 0, size(), find, 
Range::compareIntersecting, search);
+    }
+
     @Override
     public final int findNext(int thisIndex, Range find, SortedArrays.Search 
search)
     {
         return SortedArrays.exponentialSearch(ranges, thisIndex, size(), find, 
Range::compareIntersecting, search);
     }
 
+    @Override
+    public final int find(RoutableKey find, SortedArrays.Search search)
+    {
+        return SortedArrays.binarySearch(ranges, 0, size(), find, (k, r) -> 
-r.compareTo(k), search);
+    }
+
     @Override
     public final int findNext(int thisIndex, RoutableKey find, 
SortedArrays.Search search)
     {
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java 
b/accord-core/src/main/java/accord/primitives/Routables.java
index 12df7e7f..6e9df46f 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -103,11 +103,21 @@ public interface Routables<K extends Routable> extends 
Iterable<K>
      */
     long findNextIntersection(int thisIndex, Routables<K> with, int withIndex);
 
+    /**
+     * Perform {@link SortedArrays#binarySearch} looking for {@code find} with 
behaviour of {@code search}
+     */
+    int find(Range find, SortedArrays.Search search);
+
     /**
      * Perform {@link SortedArrays#exponentialSearch} from {@code thisIndex} 
looking for {@code find} with behaviour of {@code search}
      */
     int findNext(int thisIndex, Range find, SortedArrays.Search search);
 
+    /**
+     * Perform {@link SortedArrays#binarySearch} looking for {@code find} with 
behaviour of {@code search}
+     */
+    int find(RoutableKey find, SortedArrays.Search search);
+
     /**
      * Perform {@link SortedArrays#exponentialSearch} from {@code thisIndex} 
looking for {@code find} with behaviour of {@code search}
      */
diff --git 
a/accord-core/src/main/java/accord/utils/BTreeReducingIntervalMap.java 
b/accord-core/src/main/java/accord/utils/BTreeReducingIntervalMap.java
new file mode 100644
index 00000000..d8b7d0e1
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/BTreeReducingIntervalMap.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.utils;
+
+import accord.utils.btree.BTree;
+import com.google.common.collect.AbstractIterator;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterators;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static accord.utils.Invariants.*;
+import static com.google.common.collect.Iterators.filter;
+import static com.google.common.collect.Iterators.transform;
+
+/**
+ * Represents a map of ranges where precisely one value is bound to each point 
in the continuum of ranges,
+ * and a simple function is sufficient to merge values inserted to overlapping 
ranges.
+ * <p>
+ * Copied/adapted from Cassandra's PaxosRepairHistory class, however has a 
major distinction: applies only to the
+ * covered ranges, i.e. the first start bound is the lower bound, and the last 
start bound is the upper bound,
+ * and everything else is considered unknown. This is in contrast to the C* 
version where every logical range has
+ * some associated information, with the first and last entries applying to 
everything either side of the start/end bound.
+ * <p>
+ * A simple sorted array of bounds is sufficient to represent the state and 
perform efficient lookups.
+ * <p>
+ */
+public class BTreeReducingIntervalMap<K extends Comparable<? super K>, V>
+{
+    protected final boolean inclusiveEnds;
+    protected final Object[] tree;
+
+    public BTreeReducingIntervalMap()
+    {
+        this(false);
+    }
+
+    public BTreeReducingIntervalMap(boolean inclusiveEnds)
+    {
+        this(inclusiveEnds, BTree.empty());
+    }
+
+    protected BTreeReducingIntervalMap(boolean inclusiveEnds, Object[] tree)
+    {
+        this.inclusiveEnds = inclusiveEnds;
+        this.tree = tree;
+    }
+
+    public boolean inclusiveEnds()
+    {
+        return inclusiveEnds;
+    }
+
+    public final boolean inclusiveStarts()
+    {
+        return !inclusiveEnds;
+    }
+
+    public boolean isEmpty()
+    {
+        return BTree.isEmpty(tree);
+    }
+
+    public int size()
+    {
+        return Math.max(BTree.size(tree) - 1, 0);
+    }
+
+    public V get(K key)
+    {
+        checkArgument(null != key);
+        int idx = BTree.findIndex(tree, EntryComparator.instance(), key);
+
+        if (idx < 0) idx = -2 - idx;
+        else if (inclusiveEnds) --idx;
+
+        return idx < 0 || idx >= size()
+             ? null
+             : valueAt(idx);
+    }
+
+    public V foldl(BiFunction<V, V, V> reduce)
+    {
+        checkState(!isEmpty());
+        Iterator<V> iter = valuesIterator(false);
+        V result = iter.next();
+        while (iter.hasNext())
+        {
+            V next = iter.next();
+            if (next != null)
+                result = reduce.apply(result, next);
+        }
+        return result;
+    }
+
+    public <V2> V2 foldl(BiFunction<V, V2, V2> reduce, V2 accumulator, 
Predicate<V2> terminate)
+    {
+        Iterator<V> iter = valuesIterator(true);
+        while (iter.hasNext())
+        {
+            accumulator = reduce.apply(iter.next(), accumulator);
+            if (terminate.test(accumulator))
+                break;
+        }
+        return accumulator;
+    }
+
+    public <V2> V2 foldlWithBounds(QuadFunction<V, V2, K, K, V2> fold, V2 
accumulator, Predicate<V2> terminate)
+    {
+        WithBoundsIterator<K, V> iter = withBoundsIterator(true);
+        while (iter.advance())
+        {
+            accumulator = fold.apply(iter.value(), accumulator, iter.start(), 
iter.end());
+            if (terminate.test(accumulator))
+                break;
+        }
+        return accumulator;
+    }
+
+    @Override
+    public String toString()
+    {
+        return toString(v -> true);
+    }
+
+    public String toString(Predicate<V> include)
+    {
+        if (isEmpty())
+            return "{}";
+
+        StringBuilder builder = new StringBuilder("{");
+        boolean isFirst = true;
+
+        WithBoundsIterator<K, V> iter = withBoundsIterator();
+        while (iter.advance())
+        {
+            if (!include.test(iter.value()))
+                continue;
+
+            if (!isFirst)
+                builder.append(", ");
+
+            builder.append(inclusiveStarts() ? '[' : '(')
+                   .append(iter.start())
+                   .append(',')
+                   .append(iter.end())
+                   .append(inclusiveEnds() ? ']' : ')')
+                   .append('=')
+                   .append(iter.value());
+
+            isFirst = false;
+        }
+        return builder.append('}').toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        @SuppressWarnings("unchecked")
+        BTreeReducingIntervalMap<K, V> that = (BTreeReducingIntervalMap<K, V>) 
o;
+        return this.inclusiveEnds == that.inclusiveEnds && 
BTree.equals(this.tree, that.tree);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Boolean.hashCode(inclusiveEnds) + 31 * BTree.hashCode(tree);
+    }
+
+    public static class EntryComparator<K extends Comparable<? super K>, V> 
implements AsymmetricComparator<K, Entry<K, V>>
+    {
+        private static final EntryComparator<?,?> INSTANCE = new 
EntryComparator<>();
+
+        @SuppressWarnings("unchecked")
+        public static <K extends Comparable<? super K>, V> EntryComparator<K, 
V> instance()
+        {
+            return (EntryComparator<K, V>) INSTANCE;
+        }
+
+        @Override
+        public int compare(K start, Entry<K, V> entry)
+        {
+            return start.compareTo(entry.start);
+        }
+    }
+
+    public static class Entry<K extends Comparable<? super K>, V> implements 
Comparable<Entry<K, V>>
+    {
+        private final K start;
+        private final V value;
+        private final boolean hasValue;
+
+        private Entry(K start, V value, boolean hasValue)
+        {
+            this.start = start;
+            this.value = value;
+            this.hasValue = hasValue;
+        }
+
+        public static <K extends Comparable<? super K>, V> Entry<K, V> make(K 
start)
+        {
+            return new Entry<>(start, null, false);
+        }
+
+        public static <K extends Comparable<? super K>, V> Entry<K, V> make(K 
start, V value)
+        {
+            return new Entry<>(start, value, true);
+        }
+
+        public K start()
+        {
+            return start;
+        }
+
+        public V value()
+        {
+            if (hasValue) return value;
+            throw illegalState();
+        }
+
+        /** null is considered a valid value, distinct from not having a set 
value at all */
+        public boolean hasValue()
+        {
+            return hasValue;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            @SuppressWarnings("unchecked")
+            Entry<K, V> that = (Entry<K, V>) o;
+            return Objects.equals(this.start, that.start) && this.hasValue == 
that.hasValue && Objects.equals(this.value, that.value);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(start) + 31 * (Boolean.hashCode(hasValue) 
+ 31 * Objects.hashCode(value));
+        }
+
+        @Override
+        public int compareTo(@Nonnull Entry<K, V> that)
+        {
+            return this.start.compareTo(that.start);
+        }
+    }
+
+    protected static <K extends Comparable<? super K>, V, M extends 
BTreeReducingIntervalMap<K, V>> M mergeIntervals(
+            M historyLeft, M historyRight, IntervalBuilderFactory<K, V, M> 
factory)
+    {
+        if (historyLeft == null || historyLeft.isEmpty())
+            return historyRight;
+        if (historyRight == null || historyRight.isEmpty())
+            return historyLeft;
+
+        boolean inclusiveEnds = inclusiveEnds(historyLeft.inclusiveEnds, 
!historyLeft.isEmpty(), historyRight.inclusiveEnds, !historyRight.isEmpty());
+        AbstractIntervalBuilder<K, V, M> builder = 
factory.create(inclusiveEnds, historyLeft.size() + historyRight.size());
+
+        WithBoundsIterator<K, V> left = historyLeft.withBoundsIterator();
+        WithBoundsIterator<K, V> right = historyRight.withBoundsIterator();
+
+        left.advance();
+        right.advance();
+
+        K start;
+        {
+            // first loop over any range only covered by one of the two
+            WithBoundsIterator<K, V> first = 
left.start().compareTo(right.start()) <= 0 ? left : right;
+            WithBoundsIterator<K, V> second = first == left ? right : left;
+
+            while (first.hasCurrent() && first.end().compareTo(second.start()) 
<= 0)
+            {
+                if (first.value() != null)
+                    builder.append(first.start(), first.end(), first.value());
+                first.advance();
+            }
+
+            start = second.start();
+            if (first.hasCurrent() && first.start().compareTo(start) < 0 && 
first.value() != null)
+                builder.append(first.start(), start, 
builder.slice(first.start(), start, first.value()));
+            checkState(start.compareTo(second.start()) <= 0);
+        }
+
+        // loop over any range covered by both
+        while (left.hasCurrent() && right.hasCurrent())
+        {
+            int cmp = left.end().compareTo(right.end());
+            K end = (cmp <= 0 ? left : right).end();
+            V value = sliceAndReduce(start, end, left.value(), right.value(), 
builder);
+            if (cmp <= 0) left.advance();
+            if (cmp >= 0) right.advance();
+            if (value != null)
+                builder.append(start, end, value);
+            start = end;
+        }
+
+        // finally loop over any remaining range covered by only one
+        WithBoundsIterator<K, V> remaining = left.hasCurrent() ? left : right;
+        if (remaining.hasCurrent())
+        {
+            {   // only slice the first one
+                K end = remaining.end();
+                V value = remaining.value();
+                if (value != null)
+                {
+                    value = builder.slice(start, end, value);
+                    builder.append(start, end, value);
+                }
+                start = end;
+                remaining.advance();
+            }
+            while (remaining.hasCurrent())
+            {
+                K end = remaining.end();
+                V value = remaining.value();
+                if (value != null)
+                    builder.append(start, end, value);
+                start = end;
+                remaining.advance();
+            }
+        }
+
+        return builder.build();
+    }
+
+    protected static <K extends Comparable<? super K>, V, M extends 
BTreeReducingIntervalMap<K, V>> M merge(
+            M historyLeft, M historyRight, BiFunction<V, V, V> reduce, 
BoundariesBuilderFactory<K, V, M> factory)
+    {
+        if (historyLeft == null || historyLeft.isEmpty())
+            return historyRight;
+        if (historyRight == null || historyRight.isEmpty())
+            return historyLeft;
+
+        boolean inclusiveEnds = inclusiveEnds(historyLeft.inclusiveEnds, 
!historyLeft.isEmpty(), historyRight.inclusiveEnds, !historyRight.isEmpty());
+        AbstractBoundariesBuilder<K, V, M> builder = 
factory.create(inclusiveEnds, historyLeft.size() + historyRight.size());
+
+        WithBoundsIterator<K, V> left = historyLeft.withBoundsIterator();
+        WithBoundsIterator<K, V> right = historyRight.withBoundsIterator();
+
+        left.advance();
+        right.advance();
+
+        K start;
+        {
+            // first loop over any range only covered by one of the two
+            WithBoundsIterator<K, V> first = 
left.start().compareTo(right.start()) <= 0 ? left : right;
+            WithBoundsIterator<K, V> second = first == left ? right : left;
+
+            K end = null;
+            while (first.hasCurrent() && first.end().compareTo(second.start()) 
<= 0)
+            {
+                builder.append(first.start(), first.value(), reduce);
+                end = first.end();
+                first.advance();
+            }
+
+            start = second.start();
+            if (first.hasCurrent())
+                builder.append(first.start(), first.value(), reduce);
+            else
+                builder.append(end, null, reduce);
+            checkState(start.compareTo(second.start()) <= 0);
+        }
+
+        // loop over any range covered by both
+        // TODO (expected): optimise merging of very different sized maps 
(i.e. for inserts)
+        while (left.hasCurrent() && right.hasCurrent())
+        {
+            int cmp = left.end().compareTo(right.end());
+            V value = reduce(left.value(), right.value(), reduce);
+
+            if (cmp == 0)
+            {
+                builder.append(start, value, reduce);
+                start = left.end();
+                left.advance();
+                right.advance();
+            }
+            else if (cmp < 0)
+            {
+                builder.append(start, value, reduce);
+                start = left.end();
+                left.advance();
+            }
+            else
+            {
+                builder.append(start, value, reduce);
+                start = right.end();
+                right.advance();
+            }
+        }
+
+        // finally loop over any remaining range covered by only one
+        while (left.hasCurrent())
+        {
+            builder.append(start, left.value(), reduce);
+            start = left.end();
+            left.advance();
+        }
+
+        while (right.hasCurrent())
+        {
+            builder.append(start, right.value(), reduce);
+            start = right.end();
+            right.advance();
+        }
+
+        builder.append(start, null, reduce);
+
+        return builder.build();
+    }
+
+    static boolean inclusiveEnds(boolean leftIsInclusive, boolean 
leftIsDecisive, boolean rightIsInclusive, boolean rightIsDecisive)
+    {
+        if (leftIsInclusive == rightIsInclusive)
+            return leftIsInclusive;
+
+        if (leftIsDecisive && rightIsDecisive)
+            throw illegalState("Mismatching bound inclusivity/exclusivity");
+
+        return leftIsDecisive ? leftIsInclusive : rightIsInclusive;
+    }
+
+    private static <V> V reduce(V left, V right, BiFunction<V, V, V> reduce)
+    {
+        return left == null ? right : right == null ? left : 
reduce.apply(left, right);
+    }
+
+    private static <K extends Comparable<? super K>, V> V sliceAndReduce(K 
start, K end, V left, V right, AbstractIntervalBuilder<K, V, ?> builder)
+    {
+        if (left != null) left = builder.slice(start, end, left);
+        if (right != null) right = builder.slice(start, end, right);
+        return left == null ? right : right == null ? left : 
builder.reduce(left, right);
+    }
+
+    public interface BoundariesBuilderFactory<K extends Comparable<? super K>, 
V, M extends BTreeReducingIntervalMap<K, V>>
+    {
+        AbstractBoundariesBuilder<K, V, M> create(boolean inclusiveEnds, int 
capacity);
+    }
+
+    public static abstract class AbstractBoundariesBuilder<K extends 
Comparable<? super K>, V, M extends BTreeReducingIntervalMap<K, V>>
+    {
+        protected final boolean inclusiveEnds;
+
+        private final BTree.Builder<Entry<K, V>> treeBuilder;
+        private final TinyKVBuffer<K, V> buffer;
+
+        protected AbstractBoundariesBuilder(boolean inclusiveEnds, int 
capacity)
+        {
+            this.inclusiveEnds = inclusiveEnds;
+            this.treeBuilder = BTree.builder(Comparator.naturalOrder(), 
capacity);
+            this.buffer = new TinyKVBuffer<>();
+        }
+
+        public boolean isEmpty()
+        {
+            return buffer.isEmpty();
+        }
+
+        /**
+         * null is a valid value to represent no knowledge, and is the 
*expected* final value, representing
+         * the bound of our knowledge (any higher key will find no associated 
information)
+         */
+        public void append(K start, @Nullable V value, BiFunction<V, V, V> 
reduce)
+        {
+            if (buffer.isEmpty())
+            {
+                buffer.append(start, value);
+                return;
+            }
+
+            K prevStart = buffer.lastKey();
+            V prevValue = buffer.lastValue();
+
+            checkArgument(start.compareTo(prevStart) >= 0);
+
+            boolean samePrevStart = start.equals(prevStart);
+            boolean samePrevValue = Objects.equals(value, prevValue);
+
+            if (!(samePrevStart || samePrevValue))
+            {
+                if (buffer.isFull())
+                {
+                    treeBuilder.add(Entry.make(buffer.firstKey(), 
buffer.firstValue()));
+                    buffer.dropFirst();
+                }
+                buffer.append(start, value);
+                return;
+            }
+
+            if (samePrevValue)
+                return;
+
+            if (prevValue != null)
+                buffer.lastValue(reduce.apply(prevValue, value));
+            else if (buffer.size() >= 2 && 
value.equals(buffer.penultimateValue()))
+                buffer.dropLast(); // just remove the last start and (null) 
value
+            else
+                buffer.lastValue(value);
+        }
+
+        protected abstract M buildInternal(Object[] tree);
+
+        public final M build()
+        {
+            while (buffer.size() > 1)
+            {
+                treeBuilder.add(Entry.make(buffer.firstKey(), 
buffer.firstValue()));
+                buffer.dropFirst();
+            }
+
+            if (!buffer.isEmpty())
+            {
+                checkState(buffer.lastValue() == null);
+                treeBuilder.add(Entry.make(buffer.firstKey()));
+                buffer.dropFirst();
+            }
+
+            return buildInternal(treeBuilder.build());
+        }
+    }
+
+    public interface IntervalBuilderFactory<K extends Comparable<? super K>, 
V, M extends BTreeReducingIntervalMap<K, V>>
+    {
+        AbstractIntervalBuilder<K, V, M> create(boolean inclusiveEnds, int 
capacity);
+    }
+
+    public static abstract class AbstractIntervalBuilder<K extends 
Comparable<? super K>, V, M>
+    {
+        protected final boolean inclusiveEnds;
+        private final BTree.Builder<Entry<K, V>> treeBuilder;
+        private final TinyKVBuffer<K, V> buffer;
+
+        private K prevEnd;
+
+        protected AbstractIntervalBuilder(boolean inclusiveEnds, int capacity)
+        {
+            this.inclusiveEnds = inclusiveEnds;
+            this.treeBuilder = BTree.builder(Comparator.naturalOrder(), 
capacity);
+            this.buffer = new TinyKVBuffer<>();
+        }
+
+        protected abstract V slice(K start, K end, V value);
+        protected abstract V reduce(V a, V b);
+
+        protected V tryMergeEqual(@Nonnull V a, V b)
+        {
+            return a.equals(b) ? a : null;
+        }
+
+        public void append(K start, K end, @Nonnull V value)
+        {
+            if (prevEnd != null)
+            {
+                int c = prevEnd.compareTo(start);
+                checkArgument(c <= 0);
+                if (c < 0)
+                {
+                    if (buffer.isFull())
+                    {
+                        treeBuilder.add(Entry.make(buffer.firstKey(), 
buffer.firstValue()));
+                        buffer.dropFirst();
+                    }
+                    buffer.append(prevEnd, null);
+                }
+            }
+
+            V prevValue;
+            if (!buffer.isEmpty() && null != (prevValue = buffer.lastValue()) 
&& null != (prevValue = tryMergeEqual(prevValue, value)))
+            {
+                buffer.lastValue(prevValue);
+            }
+            else
+            {
+                if (buffer.isFull())
+                {
+                    treeBuilder.add(Entry.make(buffer.firstKey(), 
buffer.firstValue()));
+                    buffer.dropFirst();
+                }
+                buffer.append(start, value);
+            }
+
+            prevEnd = end;
+        }
+
+        protected abstract M buildInternal(Object[] tree);
+
+        public final M build()
+        {
+            while (!buffer.isEmpty())
+            {
+                treeBuilder.add(Entry.make(buffer.firstKey(), 
buffer.firstValue()));
+                buffer.dropFirst();
+            }
+
+            if (prevEnd != null)
+            {
+                treeBuilder.add(Entry.make(prevEnd));
+                prevEnd = null;
+            }
+
+            return buildInternal(treeBuilder.build());
+        }
+    }
+
+    protected Entry<K, V> entryAt(int idx)
+    {
+        return BTree.findByIndex(tree, idx);
+    }
+
+    protected K startAt(int idx)
+    {
+        return entryAt(idx).start();
+    }
+
+    protected V valueAt(int idx)
+    {
+        return entryAt(idx).value();
+    }
+
+    protected Iterator<Entry<K, V>> entriesIterator()
+    {
+        return BTree.iterator(tree);
+    }
+
+    protected Iterator<V> valuesIterator(boolean skipNullValues)
+    {
+        return transform(filter(entriesIterator(),
+                                skipNullValues ? e -> e.hasValue() && 
e.value() != null : Entry::hasValue),
+                         Entry::value);
+    }
+
+    protected void forEachWithBounds(boolean skipNullValues, TriConsumer<K, K, 
V> consumer)
+    {
+        if (isEmpty())
+            return;
+        WithBoundsIterator<K, V> iter = withBoundsIterator(skipNullValues);
+        while (iter.advance())
+            consumer.accept(iter.start(), iter.end(), iter.value());
+    }
+
+    public WithBoundsIterator<K, V> withBoundsIterator()
+    {
+        return new WithBoundsIterator<>(false, entriesIterator());
+    }
+
+    public WithBoundsIterator<K, V> withBoundsIterator(boolean skipNullValues)
+    {
+        return new WithBoundsIterator<>(skipNullValues, entriesIterator());
+    }
+
+    public <M> Iterator<M> iterator(boolean skipNullValues, TriFunction<K, K, 
V, M> mapper)
+    {
+        return new WithBoundsIterator<>(skipNullValues, 
entriesIterator()).map(mapper);
+    }
+
+    public <M> Stream<M> stream(boolean skipNullValues, TriFunction<K, K, V, 
M> mapper)
+    {
+        return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(skipNullValues,
 mapper), 0), false);
+    }
+
+    private enum IteratorState { START, ADVANCED, END }
+
+    public static class WithBoundsIterator<K extends Comparable<? super K>, V>
+    {
+        private final boolean skipNullValues;
+        private final Iterator<Entry<K, V>> iterator;
+
+        private Entry<K, V> curr, next;
+        private IteratorState state = IteratorState.START;
+
+        private WithBoundsIterator(boolean skipNullValues, Iterator<Entry<K, 
V>> entriesIterator)
+        {
+            this.skipNullValues = skipNullValues;
+            this.iterator = entriesIterator;
+        }
+
+        public boolean hasCurrent()
+        {
+            return state == IteratorState.ADVANCED;
+        }
+
+        public K start()
+        {
+            return curr.start();
+        }
+
+        public K end()
+        {
+            return next.start();
+        }
+
+        public V value()
+        {
+            return curr.value();
+        }
+
+        public boolean advance()
+        {
+            if (!skipNullValues)
+                return advanceInternal();
+
+            //noinspection StatementWithEmptyBody
+            while (advanceInternal() && value() == null);
+            return hasCurrent();
+        }
+
+        private boolean advanceInternal()
+        {
+            switch (state)
+            {
+                case START:
+                    if (iterator.hasNext())
+                    {
+                        curr = iterator.next();
+                        checkState(iterator.hasNext()); // must contain at 
least two entries
+                        next = iterator.next();
+                        state = IteratorState.ADVANCED;
+                    }
+                    else
+                    {
+                        state = IteratorState.END;
+                    }
+                    break;
+                case ADVANCED:
+                    if (next.hasValue())
+                    {
+                        checkState(iterator.hasNext()); // must be at least 
one entry after next if it has a value
+                        curr = next;
+                        next = iterator.next();
+                        state = IteratorState.ADVANCED;
+                    }
+                    else
+                    {
+                        checkState(!iterator.hasNext()); // should have no 
more entries if next has no value
+                        curr = next = null;
+                        state = IteratorState.END;
+                    }
+                    break;
+            }
+
+            return state == IteratorState.ADVANCED;
+        }
+
+        private <M> Iterator<M> map(TriFunction<K, K, V, M> mapper)
+        {
+            return new AbstractIterator<>()
+            {
+                @Override
+                protected M computeNext()
+                {
+                    return advance() ? mapper.apply(start(), end(), value()) : 
endOfData();
+                }
+            };
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
new file mode 100644
index 00000000..a623dfbc
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.utils;
+
+import accord.api.RoutingKey;
+import accord.primitives.*;
+import accord.utils.btree.BTree;
+import accord.utils.btree.BTreeRemoval;
+import accord.utils.btree.UpdateFunction;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import static accord.utils.Invariants.checkArgument;
+import static accord.utils.SortedArrays.Search.FAST;
+
+public class BTreeReducingRangeMap<V> extends 
BTreeReducingIntervalMap<RoutingKey, V>
+{
+    public BTreeReducingRangeMap()
+    {
+        super();
+    }
+
+    protected BTreeReducingRangeMap(boolean inclusiveEnds, Object[] tree)
+    {
+        super(inclusiveEnds, tree);
+    }
+
+    public V foldl(Routables<?> routables, BiFunction<V, V, V> fold, V 
accumulator)
+    {
+        return foldl(routables, fold, accumulator, ignore -> false);
+    }
+
+    public <V2> V2 foldl(Routables<?> routables, BiFunction<V, V2, V2> fold, 
V2 accumulator, Predicate<V2> terminate)
+    {
+        return foldl(routables, (a, b, f, ignore) -> f.apply(a, b), 
accumulator, fold, null, terminate);
+    }
+
+    public <V2, P1, P2> V2 foldl(Routables<?> routables, QuadFunction<V, V2, 
P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate)
+    {
+        switch (routables.domain())
+        {
+            default: throw new AssertionError("Unknown domain: " + 
routables.domain());
+            case Key: return foldl((AbstractKeys<?>) routables, fold, 
accumulator, p1, p2, terminate);
+            case Range: return foldl((AbstractRanges) routables, fold, 
accumulator, p1, p2, terminate);
+        }
+    }
+
+    public <V2, P1, P2> V2 foldl(AbstractKeys<?> keys, QuadFunction<V, V2, P1, 
P2, V2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate)
+    {
+        if (isEmpty())
+            return accumulator;
+
+        int treeSize = BTree.size(tree),
+            keysSize = keys.size();
+
+        int i = keys.find(startAt(0), FAST);
+        if (i < 0) i = -1 - i;
+        else if (inclusiveEnds) ++i;
+
+        while (i < keysSize)
+        {
+            int idx = findIndex(keys.get(i));
+            if (idx < 0) idx = -2 - idx;
+            else if (inclusiveEnds) --idx;
+
+            if (idx >= treeSize - 1)
+                return accumulator;
+
+            int nexti = keys.findNext(i, startAt(idx + 1), FAST);
+            if (nexti < 0) nexti = -1 -nexti;
+            else if (inclusiveEnds) ++nexti;
+
+            Entry<RoutingKey, V> entry = entryAt(idx);
+            if (i != nexti && entry.hasValue() && entry.value() != null)
+            {
+                accumulator = fold.apply(entry.value(), accumulator, p1, p2);
+                if (terminate.test(accumulator))
+                    return accumulator;
+            }
+            i = nexti;
+        }
+        return accumulator;
+    }
+
+    public <V2, P1, P2> V2 foldl(AbstractRanges ranges, QuadFunction<V, V2, 
P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate)
+    {
+        if (isEmpty())
+            return accumulator;
+
+        int treeSize = BTree.size(tree),
+            rangesSize = ranges.size();
+
+        RoutingKey start = startAt(0);
+        int i = ranges.find(start, FAST);
+        if (i < 0) i = -1 - i;
+        else if (inclusiveEnds && ranges.get(i).end().equals(start)) ++i;
+
+        while (i < rangesSize)
+        {
+            Range range = ranges.get(i++);
+
+            int startIdx = findIndex(range.start());
+            int startPos = startIdx < 0 ? (-1 - startIdx) : startIdx;
+            if (startIdx == treeSize - 1 || startPos == treeSize)
+                return accumulator;  // is last or out of bounds -> we are done
+            if (startIdx < 0) startPos = Math.max(0, startPos - 1); // 
inclusive
+
+            int endIdx = findIndex(range.end());
+            int endPos = endIdx < 0 ? (-1 - endIdx) : endIdx;
+            if (endPos == 0)
+                continue; // is first or out of bounds -> continue
+            endPos = Math.min(endPos - 1, treeSize - 2); // inclusive
+
+            Iterator<Entry<RoutingKey,V>> iterator =
+                BTree.iterator(tree, startPos, endPos, BTree.Dir.ASC);
+
+            while (iterator.hasNext())
+            {
+                Entry<RoutingKey, V> entry = iterator.next();
+                if (entry.hasValue() && entry.value() != null)
+                {
+                    accumulator = fold.apply(entry.value(), accumulator, p1, 
p2);
+                    if (terminate.test(accumulator))
+                        return accumulator;
+                }
+            }
+
+            if (endPos >= treeSize - 2)
+                return accumulator;
+
+            RoutingKey nextStart = startAt(endPos + 1);
+            i = ranges.findNext(i, nextStart, FAST);
+            if (i < 0) i = -1 - i;
+            else if (inclusiveEnds && ranges.get(i).end().equals(nextStart)) 
++i;
+        }
+
+        return accumulator;
+    }
+
+    public int findIndex(RoutableKey key)
+    {
+        return BTree.findIndex(tree, EntryComparator.instance(), key);
+    }
+
+    public static <V> BTreeReducingRangeMap<V> create(AbstractRanges ranges, V 
value)
+    {
+        checkArgument(value != null, "value is null");
+
+        if (ranges.isEmpty())
+            return new BTreeReducingRangeMap<>();
+
+        return create(ranges, value, BTreeReducingRangeMap.Builder::new);
+    }
+
+    public static <V, M extends BTreeReducingRangeMap<V>> M 
create(Unseekables<?> keysOrRanges, V value, 
BoundariesBuilderFactory<RoutingKey, V, M> builder)
+    {
+        switch (keysOrRanges.domain())
+        {
+            default: throw new AssertionError("Unhandled domain: " + 
keysOrRanges.domain());
+            case Range: return create((AbstractRanges) keysOrRanges, value, 
builder);
+            case Key: return create((AbstractUnseekableKeys) keysOrRanges, 
value, builder);
+        }
+    }
+
+    public static <V, M extends BTreeReducingRangeMap<V>> M 
create(Seekables<?, ?> keysOrRanges, V value, 
BoundariesBuilderFactory<RoutingKey, V, M> builder)
+    {
+        switch (keysOrRanges.domain())
+        {
+            default: throw new AssertionError("Unhandled domain: " + 
keysOrRanges.domain());
+            case Range: return create((AbstractRanges) keysOrRanges, value, 
builder);
+            case Key: return create((Keys) keysOrRanges, value, builder);
+        }
+    }
+
+    public static <V, M extends BTreeReducingRangeMap<V>> M 
create(AbstractRanges ranges, V value, BoundariesBuilderFactory<RoutingKey, V, 
M> factory)
+    {
+        checkArgument(value != null, "value is null");
+
+        AbstractBoundariesBuilder<RoutingKey, V, M> builder = 
factory.create(ranges.get(0).endInclusive(), ranges.size() * 2);
+        for (Range cur : ranges)
+        {
+            builder.append(cur.start(), value, (a, b) -> { throw new 
IllegalStateException(); });
+            builder.append(cur.end(), null, (a, b) -> { throw new 
IllegalStateException(); });
+        }
+
+        return builder.build();
+    }
+
+    public static <V, M extends BTreeReducingRangeMap<V>> M 
create(AbstractUnseekableKeys keys, V value, 
BoundariesBuilderFactory<RoutingKey, V, M> factory)
+    {
+        checkArgument(value != null, "value is null");
+
+        AbstractBoundariesBuilder<RoutingKey, V, M> builder = 
factory.create(keys.get(0).asRange().endInclusive(), keys.size() * 2);
+        for (int i = 0 ; i < keys.size() ; ++i)
+        {
+            Range range = keys.get(i).asRange();
+            builder.append(range.start(), value, (a, b) -> { throw new 
IllegalStateException(); });
+            builder.append(range.end(), null, (a, b) -> { throw new 
IllegalStateException(); });
+        }
+
+        return builder.build();
+    }
+
+    public static <V, M extends BTreeReducingRangeMap<V>> M create(Keys keys, 
V value, BoundariesBuilderFactory<RoutingKey, V, M> factory)
+    {
+        checkArgument(value != null, "value is null");
+
+        RoutingKey prev = keys.get(0).toUnseekable();
+        AbstractBoundariesBuilder<RoutingKey, V, M> builder;
+        {
+            Range range = prev.asRange();
+            builder = factory.create(prev.asRange().endInclusive(), 
keys.size() * 2);
+            builder.append(range.start(), value, (a, b) -> { throw new 
IllegalStateException(); });
+            builder.append(range.end(), null, (a, b) -> { throw new 
IllegalStateException(); });
+        }
+
+        for (int i = 1 ; i < keys.size() ; ++i)
+        {
+            RoutingKey unseekable = keys.get(i).toUnseekable();
+            if (unseekable.equals(prev))
+                continue;
+
+            Range range = unseekable.asRange();
+            builder.append(range.start(), value, (a, b) -> { throw new 
IllegalStateException(); });
+            builder.append(range.end(), null, (a, b) -> { throw new 
IllegalStateException(); });
+            prev = unseekable;
+        }
+
+        return builder.build();
+    }
+
+    public static <V> BTreeReducingRangeMap<V> add(BTreeReducingRangeMap<V> 
existing, Ranges ranges, V value, BiFunction<V, V, V> reduce)
+    {
+        return update(existing, ranges, value, reduce, 
BTreeReducingRangeMap::new);
+    }
+
+    public static BTreeReducingRangeMap<Timestamp> 
add(BTreeReducingRangeMap<Timestamp> existing, Ranges ranges, Timestamp value)
+    {
+        return add(existing, ranges, value, Timestamp::max);
+    }
+
+    public static <V> BTreeReducingRangeMap<V> merge(BTreeReducingRangeMap<V> 
historyLeft, BTreeReducingRangeMap<V> historyRight, BiFunction<V, V, V> reduce)
+    {
+        return BTreeReducingIntervalMap.merge(historyLeft, historyRight, 
reduce, BTreeReducingRangeMap.Builder::new);
+    }
+
+    /**
+     *  Update the range map retaining as much of the underlying BTree as 
possible
+     */
+    public static <M extends BTreeReducingRangeMap<V>, V> M update(
+        M map, Seekables<?, ?> keysOrRanges, V value, BiFunction<V, V, V> 
valueResolver,
+        BiFunction<Boolean, Object[], M> factory, 
BoundariesBuilderFactory<RoutingKey, V, M> builderFactory)
+    {
+        if (keysOrRanges.isEmpty())
+            return map;
+
+        if (map.isEmpty())
+            return create(keysOrRanges, value, builderFactory);
+
+        if (map.inclusiveEnds() != 
keysOrRanges.get(0).toUnseekable().asRange().endInclusive())
+            throw new IllegalStateException("Mismatching bound 
inclusivity/exclusivity - can't be updated");
+
+        return update(map, keysOrRanges, value, valueResolver, factory);
+    }
+
+    private static <M extends BTreeReducingRangeMap<V>, V> M update(
+            M map, Seekables<?,?> keysOrRanges, V value, BiFunction<V, V, V> 
valueResolver, BiFunction<Boolean, Object[], M> factory)
+    {
+        Accumulator<V> acc = accumulator();
+
+        int treeSize = BTree.size(map.tree);
+        boolean updatedEndEntry = false;
+
+        Range thisRange, nextRange = null;
+        for (int i = 0, rangesSize = keysOrRanges.size(); i < rangesSize; ++i)
+        {
+            thisRange = i == 0 ? keysOrRanges.get(i).toUnseekable().asRange() 
: nextRange;
+            nextRange = i < rangesSize - 1 ? keysOrRanges.get(i + 
1).toUnseekable().asRange() : null;
+
+            assert thisRange != null;
+
+            int startIdx = map.findIndex(thisRange.start());
+            int   endIdx = map.findIndex(thisRange.end());
+
+            int startIns = startIdx >= 0 ? startIdx : -1 - startIdx;
+            int   endIns =   endIdx >= 0 ?   endIdx : -1 - endIdx;
+
+            boolean isRangeOpen = false;
+
+            if (startIdx < 0) // if we start on an existing bound, we don't 
have to do anything *here*
+            {
+                if (startIns == 0) // insert before first map entry
+                {
+                    acc.add(thisRange.start(), value);
+                    isRangeOpen = true;
+                }
+                else if (startIns == treeSize) // inserting past last map entry
+                {
+                    // when adding past current end, need to update the very 
last entry from having no value to having value = null
+                    if (!updatedEndEntry)
+                    {
+                        acc.add(map.startAt(treeSize - 1), null);
+                        updatedEndEntry = true;
+                    }
+                    acc.add(thisRange.start(), value);
+                    isRangeOpen = true;
+                }
+                else // start within the current map bounds
+                {
+                    // split the range we start in if our value is higher than 
the range's
+                    if (supersedes(map.entryAt(startIns - 1), value, 
valueResolver))
+                    {
+                        acc.add(thisRange.start(), value);
+                        isRangeOpen = true;
+                    }
+                }
+            }
+
+            if (startIns < endIns)
+            {
+                // start and end are inclusive with BTree iterator
+                Iterator<Entry<RoutingKey, V>> iter = BTree.iterator(map.tree, 
startIns, endIns - 1, BTree.Dir.ASC);
+                while (iter.hasNext())
+                {
+                    Entry<RoutingKey, V> entry = iter.next();
+                    boolean supersedes = supersedes(entry, value, 
valueResolver);
+                    if (supersedes)
+                    {
+                        if (isRangeOpen)
+                            acc.remove(entry.start());
+                        else
+                            acc.add(entry.start(), value);
+                    }
+                    isRangeOpen = supersedes;
+                }
+            }
+
+            if (endIdx < 0) // if we end on an existing bound, we don't have 
to do anything
+            {
+                if (endIns == 0) // range ends before first entry in the map
+                {
+                    acc.add(thisRange.end(), null);
+                }
+                else if (endIns == treeSize) // range ends after last entry in 
the map
+                {
+                    if (nextRange == null)
+                        acc.add(thisRange.end());
+                    else
+                        acc.add(thisRange.end(), null);
+
+                    updatedEndEntry = true;
+                }
+                else // ends between two existing map bounds
+                {
+                    // split the range we end in if our value is higher than 
the range's
+                    if (isRangeOpen) acc.add(thisRange.end(), 
map.valueAt(endIns - 1));
+                }
+            }
+        }
+
+        Object[] updated = acc.apply(map.tree); acc.reuse();
+        return map.tree == updated
+             ? map
+             : factory.apply(map.inclusiveEnds(), updated);
+    }
+
+    private static final ThreadLocal<Accumulator<?>> accumulator = 
ThreadLocal.withInitial(Accumulator::new);
+
+    @SuppressWarnings("unchecked")
+    private static <V> Accumulator<V> accumulator()
+    {
+        return (Accumulator<V>) accumulator.get().reuse();
+    }
+
+    private static class Accumulator<V> implements 
BTree.Builder.QuickResolver<Entry<RoutingKey, V>>
+    {
+        private boolean isClean = true;
+
+        BTree.Builder<Entry<RoutingKey, V>> toAdd;
+        List<RoutingKey> toRemove;
+
+        void remove(RoutingKey key)
+        {
+            isClean = false;
+            if (toRemove == null)
+                toRemove = new ArrayList<>();
+            toRemove.add(key);
+        }
+
+        void add(RoutingKey key)
+        {
+            isClean = false;
+            toAdd().add(Entry.make(key));
+        }
+
+        void add(RoutingKey key, V value)
+        {
+            isClean = false;
+            toAdd().add(Entry.make(key, value));
+        }
+
+        private BTree.Builder<Entry<RoutingKey, V>> toAdd()
+        {
+            if (toAdd == null)
+            {
+                toAdd = BTree.builder(Comparator.naturalOrder());
+                toAdd.setQuickResolver(this);
+            }
+            return toAdd;
+        }
+
+        Object[] apply(Object[] tree)
+        {
+            if (toRemove != null)
+                for (RoutingKey key : toRemove)
+                    tree = BTreeRemoval.remove(tree, 
EntryComparator.instance(), key);
+
+            if (toAdd != null && !toAdd.isEmpty())
+                tree = BTree.update(tree, toAdd.build(), 
Comparator.<Entry<RoutingKey, V>>naturalOrder(), UpdateFunction.noOpReplace());
+
+            return tree;
+        }
+
+        @Override
+        public Entry<RoutingKey, V> resolve(Entry<RoutingKey, V> left, 
Entry<RoutingKey, V> right)
+        {
+            if (left.hasValue() != right.hasValue())
+                return left.hasValue() ? left : right;
+
+            if (!left.hasValue())
+                return left;
+
+            if ((left.value() == null) != (right.value() == null))
+                return left.value() == null ? right : left;
+
+            return right;
+        }
+
+        Accumulator<V> reuse()
+        {
+            if (!isClean)
+            {
+                if (toRemove != null) toRemove.clear();
+                if (toAdd != null) toAdd.reuse();
+                isClean = true;
+            }
+            return this;
+        }
+    }
+
+    private static <V> boolean supersedes(Entry<RoutingKey, V> entry, V value, 
BiFunction<V, V, V> resolver)
+    {
+        return !entry.hasValue()
+            || (entry.value() == null && value != null)
+            || (entry.value() != null && value != null && 
!resolver.apply(entry.value(), value).equals(entry.value()));
+    }
+
+    static class Builder<V> extends AbstractBoundariesBuilder<RoutingKey, V, 
BTreeReducingRangeMap<V>>
+    {
+        protected Builder(boolean inclusiveEnds, int capacity)
+        {
+            super(inclusiveEnds, capacity);
+        }
+
+        @Override
+        protected BTreeReducingRangeMap<V> buildInternal(Object[] tree)
+        {
+            return new BTreeReducingRangeMap<>(inclusiveEnds, tree);
+        }
+    }
+
+    /**
+     * A non-validating builder that expects all entries to be in correct 
order. For implementations' ser/de logic.
+     */
+    public static class RawBuilder<V, M>
+    {
+        protected final boolean inclusiveEnds;
+        protected final int capacity;
+
+        private BTree.Builder<Entry<RoutingKey, V>> treeBuilder;
+        private boolean lastStartAppended;
+
+        public RawBuilder(boolean inclusiveEnds, int capacity)
+        {
+            this.inclusiveEnds = inclusiveEnds;
+            this.capacity = capacity;
+        }
+
+        public RawBuilder<V, M> append(RoutingKey start, V value)
+        {
+            return append(Entry.make(start, value));
+        }
+
+        public RawBuilder<V, M> append(RoutingKey start)
+        {
+            return append(Entry.make(start));
+        }
+
+        public RawBuilder<V, M> append(Entry<RoutingKey, V> entry)
+        {
+            Invariants.checkState(!lastStartAppended);
+            if (treeBuilder == null)
+                (treeBuilder = BTree.builder(Comparator.naturalOrder(), 
capacity + 1)).auto(false);
+            treeBuilder.add(entry);
+            lastStartAppended = !entry.hasValue();
+            return this;
+        }
+
+        public final M build(BiFunction<Boolean, Object[], M> constructor)
+        {
+            Invariants.checkState(lastStartAppended || treeBuilder == null);
+            return constructor.apply(inclusiveEnds, treeBuilder == null ? 
BTree.empty() : treeBuilder.build());
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 09a44017..2383469d 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -127,12 +127,13 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         if (values.length == 0)
             return accumulator;
 
-        int i = 0, j = keys.findNext(0, starts[0], FAST);
+        int i = 0, j = keys.find(starts[0], FAST);
         if (j < 0) j = -1 - j;
         else if (inclusiveEnds) ++j;
 
         while (j < keys.size())
         {
+            // TODO (desired): first search should be binarySearch
             i = exponentialSearch(starts, i, starts.length, keys.get(j));
             if (i < 0) i = -2 - i;
             else if (inclusiveEnds) --i;
@@ -161,26 +162,27 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         if (values.length == 0)
             return accumulator;
 
-        // TODO (desired): first searches should be binarySearch
-        int j = ranges.findNext(0, starts[0], FAST);
+        int j = ranges.find(starts[0], FAST);
         if (j < 0) j = -1 - j;
         else if (inclusiveEnds && ranges.get(j).end().equals(starts[0])) ++j;
 
         int i = 0;
         while (j < ranges.size())
         {
-            Range range = ranges.get(j);
-            RoutingKey start = range.start();
-            int nexti = exponentialSearch(starts, i, starts.length, start);
+            // TODO (desired): first search should be binarySearch
+            int nexti = exponentialSearch(starts, i, starts.length, 
ranges.get(j).start());
             if (nexti < 0) i = Math.max(i, -2 - nexti);
-            else if (nexti > i && !inclusiveStarts()) i = nexti - 1;
+            else if (nexti > i && inclusiveEnds) i = nexti - 1;
             else i = nexti;
 
             if (i >= values.length)
                 return accumulator;
 
             int toj, nextj = ranges.findNext(j, starts[i + 1], FAST);
-            if (nextj < 0) toj = nextj = -1 -nextj;
+            if (nextj < 0)
+            {
+                toj = nextj = -1 - nextj;
+            }
             else
             {
                 toj = nextj + 1;
@@ -215,7 +217,7 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         if (values.length == 0 || keys.isEmpty())
             return fold.apply(defaultValue, accumulator, p1, p2, 0, 
keys.size());
 
-        int i = 0, j = keys.findNext(0, starts[0], FAST);
+        int i = 0, j = keys.find(starts[0], FAST);
         if (j < 0) j = -1 - j;
         else if (inclusiveEnds) ++j;
 
@@ -224,6 +226,7 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
 
         while (j < keys.size())
         {
+            // TODO (desired): first search should be binarySearch
             i = exponentialSearch(starts, i, starts.length, keys.get(j));
             if (i < 0) i = -2 - i;
             else if (inclusiveEnds) --i;
@@ -256,8 +259,7 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         if (values.length == 0 || ranges.isEmpty())
             return fold.apply(defaultValue, accumulator, p1, p2, 0, 
ranges.size());
 
-        // TODO (desired): first searches should be binarySearch
-        int j = ranges.findNext(0, starts[0], FAST);
+        int j = ranges.find(starts[0], FAST);
         if (j < 0) j = -1 - j;
         else if (inclusiveEnds && ranges.get(j).end().equals(starts[0])) ++j;
 
@@ -267,18 +269,20 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         int i = 0;
         while (j < ranges.size())
         {
-            Range range = ranges.get(j);
-            RoutingKey start = range.start();
-            int nexti = exponentialSearch(starts, i, starts.length, start);
+            // TODO (desired): first search should be binarySearch
+            int nexti = exponentialSearch(starts, i, starts.length, 
ranges.get(j).start());
             if (nexti < 0) i = Math.max(i, -2 - nexti);
-            else if (nexti > i && !inclusiveStarts()) i = nexti - 1;
+            else if (nexti > i && inclusiveEnds) i = nexti - 1;
             else i = nexti;
 
             if (i >= values.length)
                 return fold.apply(defaultValue, accumulator, p1, p2, j, 
ranges.size());
 
             int toj, nextj = ranges.findNext(j, starts[i + 1], FAST);
-            if (nextj < 0) toj = nextj = -1 -nextj;
+            if (nextj < 0)
+            {
+                toj = nextj = -1 -nextj;
+            }
             else
             {
                 toj = nextj + 1;
diff --git a/accord-core/src/main/java/accord/utils/TinyKVBuffer.java 
b/accord-core/src/main/java/accord/utils/TinyKVBuffer.java
new file mode 100644
index 00000000..01d1957c
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/TinyKVBuffer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.utils;
+
+import static accord.utils.Invariants.checkState;
+import static accord.utils.Invariants.illegalState;
+
+/**
+ * A 3-element buffer of key-value pairs
+ */
+class TinyKVBuffer<K, V>
+{
+    private K k0, k1, k2;
+    private V v0, v1, v2;
+
+    void append(K key, V value)
+    {
+             if (k0 == null) { k0 = key; v0 = value; }
+        else if (k1 == null) { k1 = key; v1 = value; }
+        else if (k2 == null) { k2 = key; v2 = value; }
+        else illegalState();
+    }
+
+    void dropFirst()
+    {
+        checkState(k0 != null);
+        k0 = k1;   v0 = v1;
+        k1 = k2;   v1 = v2;
+        k2 = null; v2 = null;
+    }
+
+    void dropLast()
+    {
+             if (k2 != null) { k2 = null; v2 = null; }
+        else if (k1 != null) { k1 = null; v1 = null; }
+        else if (k0 != null) { k0 = null; v0 = null; }
+        else illegalState();
+    }
+
+    int size()
+    {
+        if (k0 == null) return 0;
+        if (k1 == null) return 1;
+        if (k2 == null) return 2;
+        else            return 3;
+    }
+
+    boolean isFull()
+    {
+        return k2 != null;
+    }
+
+    boolean isEmpty()
+    {
+        return k0 == null;
+    }
+
+    K firstKey()
+    {
+        if (k0 != null) return k0;
+        throw illegalState();
+    }
+
+    V firstValue()
+    {
+        if (k0 != null) return v0;
+        throw illegalState();
+    }
+
+    V penultimateValue()
+    {
+        if (k2 != null) return v1;
+        if (k1 != null) return v0;
+        throw illegalState();
+    }
+
+    K lastKey()
+    {
+        if (k2 != null) return k2;
+        if (k1 != null) return k1;
+        if (k0 != null) return k0;
+        throw illegalState();
+    }
+
+    V lastValue()
+    {
+        if (k2 != null) return v2;
+        if (k1 != null) return v1;
+        if (k0 != null) return v0;
+        throw illegalState();
+    }
+
+    void lastValue(V value)
+    {
+             if (k2 != null) v2 = value;
+        else if (k1 != null) v1 = value;
+        else if (k0 != null) v0 = value;
+        else throw illegalState();
+    }
+}
diff --git a/accord-core/src/main/java/accord/utils/TriConsumer.java 
b/accord-core/src/main/java/accord/utils/TriConsumer.java
new file mode 100644
index 00000000..ba3826ee
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/TriConsumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.utils;
+
+public interface TriConsumer<P1, P2, P3>
+{
+    void accept(P1 p1, P2 p2, P3 p3);
+}
diff --git 
a/accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java 
b/accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java
new file mode 100644
index 00000000..3f5ff850
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import accord.api.RoutingKey;
+import accord.impl.IntKey;
+import accord.local.Node;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.RoutingKeys;
+import accord.primitives.Timestamp;
+import accord.utils.BTreeReducingRangeMap.RawBuilder;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import javax.annotation.Nonnull;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static java.lang.Integer.MAX_VALUE;
+import static java.lang.Integer.MIN_VALUE;
+
+// TODO (desired): test start inclusive ranges
+public class BTreeReducingRangeMapTest
+{
+    static final BTreeReducingRangeMap<Timestamp> EMPTY = new 
BTreeReducingRangeMap<>();
+    static final RoutingKey MINIMUM_EXCL = new IntKey.Routing(MIN_VALUE);
+    static final RoutingKey MAXIMUM_EXCL = new IntKey.Routing(MAX_VALUE);
+    static boolean END_INCLUSIVE = false;
+
+    private static RoutingKey rk(int t)
+    {
+        return new IntKey.Routing(t);
+    }
+    private static RoutingKey rk(Random random)
+    {
+        int rk = random.nextInt();
+        if (random.nextBoolean()) rk = -rk;
+        if (rk == MAX_VALUE) --rk;
+        if (rk == MIN_VALUE) ++rk;
+        return new IntKey.Routing(rk);
+    }
+
+    private static Timestamp none()
+    {
+        return null;
+    }
+
+    private static Timestamp ts(int b)
+    {
+        return Timestamp.fromValues(1, b, 0, new Node.Id(1));
+    }
+
+    private static Range r(RoutingKey l, RoutingKey r)
+    {
+        return END_INCLUSIVE ? new Range.EndInclusive(l, r) : new 
Range.StartInclusive(l, r);
+    }
+
+    private static RoutingKey incr(RoutingKey rk)
+    {
+        return new IntKey.Routing(((IntKey.Routing)rk).key + 1);
+    }
+
+    private static RoutingKey decr(RoutingKey rk)
+    {
+        return new IntKey.Routing(((IntKey.Routing)rk).key - 1);
+    }
+
+    private static Range r(int l, int r)
+    {
+        return r(rk(l), rk(r));
+    }
+
+    private static Pair<RoutingKey, Timestamp> pt(int t, int b)
+    {
+        return Pair.create(rk(t), ts(b));
+    }
+
+    private static Pair<RoutingKey, Timestamp> pt(int t, Timestamp b)
+    {
+        return Pair.create(rk(t), b);
+    }
+
+    private static Pair<RoutingKey, Timestamp> pt(RoutingKey t, int b)
+    {
+        return Pair.create(t, ts(b));
+    }
+
+    private static BTreeReducingRangeMap<Timestamp> h(Pair<RoutingKey, 
Timestamp>... points)
+    {
+        Invariants.checkState(points[0].right == none());
+        int length = points.length;
+        RawBuilder<Timestamp, BTreeReducingRangeMap<Timestamp>> builder = new 
RawBuilder<>(true, length - 1);
+        for (int i = 1 ; i < length ; ++i)
+            builder.append(points[i - 1].left, points[i].right);
+        builder.append(points[length - 1].left);
+        return builder.build(BTreeReducingRangeMap::new);
+    }
+
+    static
+    {
+        assert rk(100).equals(rk(100));
+        assert ts(111).equals(ts(111));
+    }
+
+    private static class Builder
+    {
+        BTreeReducingRangeMap<Timestamp> history = EMPTY;
+
+        Builder add(Timestamp timestamp, Range... ranges)
+        {
+            history = BTreeReducingRangeMap.add(history, Ranges.of(ranges), 
timestamp);
+            return this;
+        }
+
+        Builder clear()
+        {
+            history = EMPTY;
+            return this;
+        }
+    }
+
+    static Builder builder()
+    {
+        return new Builder();
+    }
+
+    @Test
+    public void testOne()
+    {
+        testRandomAdds(8532037884171168001L, 3, 1, 3, 0.100000f, 0.100000f);
+    }
+
+    @Test
+    public void testRandomAdds() throws ExecutionException, 
InterruptedException
+    {
+        ExecutorService executor = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        List<ListenableFuture<Void>> results = new ArrayList<>();
+        int count = 100000;
+        for (int numberOfAdditions : new int[] { 1, 10, 100 })
+        {
+            for (float maxCoveragePerRange : new float[] { 0.01f, 0.1f, 0.5f })
+            {
+                for (float chanceOfMinRoutingKey : new float[] { 0.01f, 0.1f })
+                {
+                    results.addAll(testRandomAdds(executor, count, 3, 
numberOfAdditions, 3, maxCoveragePerRange, chanceOfMinRoutingKey));
+                }
+            }
+        }
+        Futures.allAsList(results).get();
+        executor.shutdown();
+    }
+
+    private List<ListenableFuture<Void>> testRandomAdds(ExecutorService 
executor, int tests, int numberOfMerges, int numberOfAdditions, int 
maxNumberOfRangesPerAddition, float maxCoveragePerRange, float 
chanceOfMinRoutingKey)
+    {
+        return ThreadLocalRandom.current()
+                .longs(tests)
+                .mapToObj(seed -> {
+                    SettableFuture<Void> promise = SettableFuture.create();
+                    executor.execute(() -> {
+                        try
+                        {
+                            testRandomAdds(seed, numberOfMerges, 
numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, 
chanceOfMinRoutingKey);
+                            promise.set(null);
+                        }
+                        catch (Throwable t)
+                        {
+                            promise.setException(t);
+                        }
+                    });
+                    return promise;
+                })
+                .collect(Collectors.toList());
+    }
+
+    private void testRandomAdds(long seed, int numberOfMerges, int 
numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, 
float chanceOfMinRoutingKey)
+    {
+        String id = String.format("%d, %d, %d, %d, %f, %f", seed, 
numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, 
maxCoveragePerRange, chanceOfMinRoutingKey);
+        try
+        {
+            Random random = new Random(seed);
+            List<RandomWithCanonical> merge = new ArrayList<>();
+            while (numberOfMerges-- > 0)
+            {
+                RandomWithCanonical build = new RandomWithCanonical();
+                build.addRandom(random, numberOfAdditions, 
maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey);
+                build.validate(random, id);
+                merge.add(build);
+            }
+
+            RandomWithCanonical check = new RandomWithCanonical();
+            for (RandomWithCanonical add : merge)
+                check = check.merge(random, add);
+    //        check.serdeser();
+
+            check.validate(random, id);
+        }
+        catch (Throwable t)
+        {
+            if (!(t instanceof AssertionFailedError))
+                throw new RuntimeException(id, t);
+        }
+    }
+
+    static class RandomMap
+    {
+        BTreeReducingRangeMap<Timestamp> test = new BTreeReducingRangeMap<>();
+
+        void add(Ranges ranges, Timestamp timestamp)
+        {
+            test = BTreeReducingRangeMap.add(test, ranges, timestamp);
+        }
+
+        void merge(RandomMap other)
+        {
+            test = BTreeReducingRangeMap.merge(test, other.test, 
Timestamp::max);
+        }
+
+        void addOneRandom(Random random, int maxRangeCount, float maxCoverage, 
float minChance)
+        {
+            int count = maxRangeCount == 1 ? 1 : 1 + 
random.nextInt(maxRangeCount - 1);
+            Timestamp timestamp = ts(random.nextInt(MAX_VALUE));
+            List<Range> ranges = new ArrayList<>();
+            while (count-- > 0)
+            {
+                int length = (int) (2 * random.nextDouble() * maxCoverage * 
MAX_VALUE);
+                if (length == 0) length = 1;
+                Range range;
+                if (random.nextFloat() <= minChance)
+                {
+                    if (random.nextBoolean()) range = r(MIN_VALUE + 1, 
MIN_VALUE + 1 + length);
+                    else range = r(MAX_VALUE - length - 1, MAX_VALUE - 1);
+                }
+                else
+                {
+                    int start = random.nextInt(MAX_VALUE - length - 1);
+                    range = r(start, start + length);
+                }
+                ranges.add(range);
+            }
+            add(Ranges.of(ranges.toArray(new Range[0])), timestamp);
+        }
+
+        void addRandom(Random random, int count, int 
maxNumberOfRangesPerAddition, float maxCoveragePerAddition, float 
minRoutingKeyChance)
+        {
+            while (count-- > 0)
+                addOneRandom(random, maxNumberOfRangesPerAddition, 
maxCoveragePerAddition, minRoutingKeyChance);
+        }
+
+
+        static BTreeReducingRangeMap<Timestamp> build(Random random, int 
count, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float 
chanceOfMinRoutingKey)
+        {
+            RandomMap result = new RandomMap();
+            result.addRandom(random, count, maxNumberOfRangesPerAddition, 
maxCoveragePerRange, chanceOfMinRoutingKey);
+            return result.test;
+        }
+    }
+
+    static class RandomWithCanonical extends RandomMap
+    {
+        // confusingly, we use lower bounds here since we copied over from C*
+        NavigableMap<RoutingKey, Timestamp> canonical = new TreeMap<>();
+        {
+            canonical.put(MINIMUM_EXCL, none());
+            canonical.put(MAXIMUM_EXCL, none());
+        }
+
+        Timestamp get(RoutingKey rk)
+        {
+            return canonical.ceilingEntry(rk).getValue();
+        }
+
+        RandomWithCanonical merge(Random random, RandomWithCanonical other)
+        {
+            RandomWithCanonical result = new RandomWithCanonical();
+            result.test = random.nextBoolean()
+                          ? BTreeReducingRangeMap.merge(test, other.test, 
Timestamp::max)
+                          : BTreeReducingIntervalMap.mergeIntervals(test, 
other.test, IntervalBuilder::new);
+            result.canonical = new TreeMap<>();
+            result.canonical.putAll(canonical);
+            RoutingKey prev = null;
+            for (Map.Entry<RoutingKey, Timestamp> entry : 
other.canonical.entrySet())
+            {
+                if (prev != null) result.addCanonical(r(prev, entry.getKey()), 
entry.getValue());
+                prev = entry.getKey();
+            }
+            return result;
+        }
+
+        static class IntervalBuilder extends 
BTreeReducingIntervalMap.AbstractIntervalBuilder<RoutingKey, Timestamp, 
BTreeReducingRangeMap<Timestamp>>
+        {
+            protected IntervalBuilder(boolean inclusiveEnds, int capacity)
+            {
+                super(inclusiveEnds, capacity);
+            }
+
+            @Override
+            protected Timestamp slice(RoutingKey start, RoutingKey end, 
Timestamp value)
+            {
+                return value;
+            }
+
+            @Override
+            protected Timestamp reduce(Timestamp a, Timestamp b)
+            {
+                return Timestamp.max(a, b);
+            }
+
+            @Override
+            protected Timestamp tryMergeEqual(@Nonnull Timestamp a, Timestamp 
b)
+            {
+                return a;
+            }
+
+            @Override
+            protected BTreeReducingRangeMap<Timestamp> buildInternal(Object[] 
tree)
+            {
+                return new BTreeReducingRangeMap<>(inclusiveEnds, tree);
+            }
+        }
+
+//        void serdeser()
+//        {
+//            ReduceRangeMap<RoutingKey, Timestamp> tmp = 
ReduceRangeMap.fromTupleBufferList(test.toTupleBufferList());
+//            Assertions.assertEquals(test, tmp);
+//            test = tmp;
+//        }
+
+        @Override
+        void add(Ranges addRanges, Timestamp timestamp)
+        {
+            super.add(addRanges, timestamp);
+            for (Range range : addRanges)
+                addCanonical(range, timestamp);
+        }
+
+        @Override
+        void addOneRandom(Random random, int maxRangeCount, float maxCoverage, 
float minChance)
+        {
+            super.addOneRandom(random, maxRangeCount, maxCoverage, minChance);
+//            validate(new Random(), "");
+        }
+
+        void addCanonical(Range range, Timestamp timestamp)
+        {
+            canonical.put(range.start(), 
canonical.ceilingEntry(range.start()).getValue());
+            canonical.put(range.end(), 
canonical.ceilingEntry(range.end()).getValue());
+
+            canonical.subMap(range.start(), !END_INCLUSIVE, range.end(), 
END_INCLUSIVE)
+                    .entrySet().forEach(e -> 
e.setValue(Timestamp.nonNullOrMax(e.getValue(), timestamp)));
+        }
+
+        void validate(Random random, String id)
+        {
+            for (RoutingKey rk : canonical.keySet())
+            {
+                Assertions.assertEquals(get(decr(rk)), test.get(decr(rk)), id);
+                Assertions.assertEquals(get(rk), test.get(rk), id);
+                Assertions.assertEquals(get(incr(rk)), test.get(incr(rk)), id);
+            }
+
+            // check some random
+            {
+                int remaining = 1000;
+                while (remaining-- > 0)
+                {
+                    RoutingKey routingKey = rk(random);
+                    Assertions.assertEquals(get(routingKey), 
test.get(routingKey), id);
+                }
+            }
+
+            // validate foldl
+            {
+                int remaining = 100;
+                while (remaining-- > 0)
+                {
+                    int count = 1 + random.nextInt(20);
+                    RoutingKeys keys;
+                    Ranges ranges;
+                    {
+                        RoutingKey[] tmp = new RoutingKey[count];
+                        for (int i = 0 ; i < tmp.length ; ++i)
+                            tmp[i] = rk(random);
+                        keys = RoutingKeys.of(tmp);
+                        Range[] tmp2 = new Range[(keys.size() + 1) / 2];
+                        int i = 0, c = 0;
+                        if (keys.size() % 2 == 1 && random.nextBoolean())
+                            tmp2[c++] = r(MINIMUM_EXCL, keys.get(i++));
+                        while (i + 1 < keys.size())
+                        {
+                            tmp2[c++] = r(keys.get(i), keys.get(i+1));
+                            i += 2;
+                        }
+                        if (i < keys.size())
+                            tmp2[c++] = r(keys.get(i++), MAXIMUM_EXCL);
+                        ranges = Ranges.of(tmp2);
+                    }
+
+                    List<Timestamp> foldl = test.foldl(keys, (timestamp, 
timestamps) -> {
+                            if (timestamps.isEmpty() || 
!timestamps.get(timestamps.size() - 1).equals(timestamp))
+                                timestamps.add(timestamp);
+                            return timestamps;
+                        }, new ArrayList<>(), ignore -> false);
+
+                    List<Timestamp> canonFoldl = new ArrayList<>();
+                    for (RoutingKey key : keys)
+                    {
+                        Timestamp next = get(key);
+                        if (next == null)
+                            continue;
+                        if (canonFoldl.isEmpty() || 
!canonFoldl.get(canonFoldl.size() - 1).equals(next))
+                            canonFoldl.add(next);
+                    }
+                    Assertions.assertEquals(canonFoldl, foldl, id);
+
+                    foldl = test.foldl(ranges, (timestamp, timestamps) -> {
+                        if (timestamps.isEmpty() || 
!timestamps.get(timestamps.size() - 1).equals(timestamp))
+                            timestamps.add(timestamp);
+                        return timestamps;
+                    }, new ArrayList<>(), ignore -> false);
+
+                    canonFoldl.clear();
+                    for (Range range : ranges)
+                    {
+                        RoutingKey start = END_INCLUSIVE ? 
canonical.higherKey(range.start()) : canonical.ceilingKey(range.start());
+                        RoutingKey end = END_INCLUSIVE ? 
canonical.ceilingKey(range.end()) : canonical.higherKey(range.end());
+                        for (Timestamp next : canonical.subMap(start, true, 
end, true).values())
+                        {
+                            if (next == null)
+                                continue;
+
+                            if (canonFoldl.isEmpty() || 
!canonFoldl.get(canonFoldl.size() - 1).equals(next))
+                                canonFoldl.add(next);
+                        }
+                    }
+                    Assertions.assertEquals(canonFoldl, foldl, id);
+                }
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to