This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit c10ea993a3c75f9e1bf931ae5d0dd5e7c779e28b Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Tue Sep 17 13:55:22 2024 +0100 Change MaxConflicts to use a BTree under the hood patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-19952 --- .../src/main/java/accord/local/MaxConflicts.java | 52 +- .../main/java/accord/primitives/AbstractKeys.java | 12 + .../java/accord/primitives/AbstractRanges.java | 12 + .../src/main/java/accord/primitives/Routables.java | 10 + .../accord/utils/BTreeReducingIntervalMap.java | 783 +++++++++++++++++++++ .../java/accord/utils/BTreeReducingRangeMap.java | 535 ++++++++++++++ .../main/java/accord/utils/ReducingRangeMap.java | 36 +- .../src/main/java/accord/utils/TinyKVBuffer.java | 115 +++ .../src/main/java/accord/utils/TriConsumer.java | 23 + .../accord/utils/BTreeReducingRangeMapTest.java | 465 ++++++++++++ 10 files changed, 1985 insertions(+), 58 deletions(-) diff --git a/accord-core/src/main/java/accord/local/MaxConflicts.java b/accord-core/src/main/java/accord/local/MaxConflicts.java index c31a1bfa..421c54f6 100644 --- a/accord-core/src/main/java/accord/local/MaxConflicts.java +++ b/accord-core/src/main/java/accord/local/MaxConflicts.java @@ -15,21 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package accord.local; -import javax.annotation.Nonnull; - import accord.api.RoutingKey; -import accord.primitives.AbstractRanges; import accord.primitives.Routables; import accord.primitives.Seekables; import accord.primitives.Timestamp; -import accord.utils.ReducingIntervalMap; -import accord.utils.ReducingRangeMap; +import accord.utils.BTreeReducingRangeMap; // TODO (expected): track read/write conflicts separately -public class MaxConflicts extends ReducingRangeMap<Timestamp> +class MaxConflicts extends BTreeReducingRangeMap<Timestamp> { public static final MaxConflicts EMPTY = new MaxConflicts(); @@ -38,59 +33,32 @@ public class MaxConflicts extends ReducingRangeMap<Timestamp> super(); } - private MaxConflicts(boolean inclusiveEnds, RoutingKey[] starts, Timestamp[] values) + private MaxConflicts(boolean inclusiveEnds, Object[] tree) { - super(inclusiveEnds, starts, values); + super(inclusiveEnds, tree); } - public Timestamp get(Seekables<?, ?> keysOrRanges) + Timestamp get(Routables<?> keysOrRanges) { return foldl(keysOrRanges, Timestamp::max, Timestamp.NONE); } - public Timestamp get(Routables<?> keysOrRanges) - { - return foldl(keysOrRanges, Timestamp::max, Timestamp.NONE); - } - - MaxConflicts update(Seekables<?, ?> keysOrRanges, Timestamp maxConflict) - { - return merge(this, create(keysOrRanges, maxConflict)); - } - - public static MaxConflicts create(AbstractRanges ranges, @Nonnull Timestamp maxConflict) - { - if (ranges.isEmpty()) - return MaxConflicts.EMPTY; - - return create(ranges, maxConflict, MaxConflicts.Builder::new); - } - - public static MaxConflicts create(Seekables<?, ?> keysOrRanges, @Nonnull Timestamp maxConflict) - { - if (keysOrRanges.isEmpty()) - return MaxConflicts.EMPTY; - - return create(keysOrRanges, maxConflict, Builder::new); - } - - public static MaxConflicts merge(MaxConflicts a, MaxConflicts b) + public MaxConflicts update(Seekables<?, ?> keysOrRanges, Timestamp maxConflict) { - return ReducingIntervalMap.merge(a, b, Timestamp::max, MaxConflicts.Builder::new); + return update(this, keysOrRanges, maxConflict, Timestamp::max, MaxConflicts::new, Builder::new); } - static class Builder extends AbstractBoundariesBuilder<RoutingKey, Timestamp, MaxConflicts> + private static class Builder extends AbstractBoundariesBuilder<RoutingKey, Timestamp, MaxConflicts> { protected Builder(boolean inclusiveEnds, int capacity) { super(inclusiveEnds, capacity); } - @SuppressWarnings("unchecked") @Override - protected MaxConflicts buildInternal() + protected MaxConflicts buildInternal(Object[] tree) { - return new MaxConflicts(inclusiveEnds, starts.toArray(new RoutingKey[0]), values.toArray(new Timestamp[0])); + return new MaxConflicts(inclusiveEnds, tree); } } } diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java index 29b89467..771cefd2 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java +++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java @@ -117,12 +117,24 @@ public abstract class AbstractKeys<K extends RoutableKey> implements Iterable<K> return findNextIntersection(0, ranges, 0) >= 0; } + @Override + public final int find(RoutableKey key, SortedArrays.Search search) + { + return SortedArrays.binarySearch(keys, 0, keys.length, key, RoutableKey::compareTo, search); + } + @Override public final int findNext(int thisIndex, RoutableKey key, SortedArrays.Search search) { return SortedArrays.exponentialSearch(keys, thisIndex, keys.length, key, RoutableKey::compareTo, search); } + @Override + public final int find(Range find, SortedArrays.Search search) + { + return SortedArrays.binarySearch(keys, 0, size(), find, Range::compareTo, search); + } + @Override public final int findNext(int thisIndex, Range find, SortedArrays.Search search) { diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java index 50d90061..5c0a7cd7 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java +++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java @@ -204,12 +204,24 @@ public abstract class AbstractRanges implements Iterable<Range>, Routables<Range return findNextIntersection(thisIndex, (AbstractRanges) with, withIndex); } + @Override + public final int find(Range find, SortedArrays.Search search) + { + return SortedArrays.binarySearch(ranges, 0, size(), find, Range::compareIntersecting, search); + } + @Override public final int findNext(int thisIndex, Range find, SortedArrays.Search search) { return SortedArrays.exponentialSearch(ranges, thisIndex, size(), find, Range::compareIntersecting, search); } + @Override + public final int find(RoutableKey find, SortedArrays.Search search) + { + return SortedArrays.binarySearch(ranges, 0, size(), find, (k, r) -> -r.compareTo(k), search); + } + @Override public final int findNext(int thisIndex, RoutableKey find, SortedArrays.Search search) { diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java index 12df7e7f..6e9df46f 100644 --- a/accord-core/src/main/java/accord/primitives/Routables.java +++ b/accord-core/src/main/java/accord/primitives/Routables.java @@ -103,11 +103,21 @@ public interface Routables<K extends Routable> extends Iterable<K> */ long findNextIntersection(int thisIndex, Routables<K> with, int withIndex); + /** + * Perform {@link SortedArrays#binarySearch} looking for {@code find} with behaviour of {@code search} + */ + int find(Range find, SortedArrays.Search search); + /** * Perform {@link SortedArrays#exponentialSearch} from {@code thisIndex} looking for {@code find} with behaviour of {@code search} */ int findNext(int thisIndex, Range find, SortedArrays.Search search); + /** + * Perform {@link SortedArrays#binarySearch} looking for {@code find} with behaviour of {@code search} + */ + int find(RoutableKey find, SortedArrays.Search search); + /** * Perform {@link SortedArrays#exponentialSearch} from {@code thisIndex} looking for {@code find} with behaviour of {@code search} */ diff --git a/accord-core/src/main/java/accord/utils/BTreeReducingIntervalMap.java b/accord-core/src/main/java/accord/utils/BTreeReducingIntervalMap.java new file mode 100644 index 00000000..d8b7d0e1 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/BTreeReducingIntervalMap.java @@ -0,0 +1,783 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.utils; + +import accord.utils.btree.BTree; +import com.google.common.collect.AbstractIterator; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Objects; +import java.util.Spliterators; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static accord.utils.Invariants.*; +import static com.google.common.collect.Iterators.filter; +import static com.google.common.collect.Iterators.transform; + +/** + * Represents a map of ranges where precisely one value is bound to each point in the continuum of ranges, + * and a simple function is sufficient to merge values inserted to overlapping ranges. + * <p> + * Copied/adapted from Cassandra's PaxosRepairHistory class, however has a major distinction: applies only to the + * covered ranges, i.e. the first start bound is the lower bound, and the last start bound is the upper bound, + * and everything else is considered unknown. This is in contrast to the C* version where every logical range has + * some associated information, with the first and last entries applying to everything either side of the start/end bound. + * <p> + * A simple sorted array of bounds is sufficient to represent the state and perform efficient lookups. + * <p> + */ +public class BTreeReducingIntervalMap<K extends Comparable<? super K>, V> +{ + protected final boolean inclusiveEnds; + protected final Object[] tree; + + public BTreeReducingIntervalMap() + { + this(false); + } + + public BTreeReducingIntervalMap(boolean inclusiveEnds) + { + this(inclusiveEnds, BTree.empty()); + } + + protected BTreeReducingIntervalMap(boolean inclusiveEnds, Object[] tree) + { + this.inclusiveEnds = inclusiveEnds; + this.tree = tree; + } + + public boolean inclusiveEnds() + { + return inclusiveEnds; + } + + public final boolean inclusiveStarts() + { + return !inclusiveEnds; + } + + public boolean isEmpty() + { + return BTree.isEmpty(tree); + } + + public int size() + { + return Math.max(BTree.size(tree) - 1, 0); + } + + public V get(K key) + { + checkArgument(null != key); + int idx = BTree.findIndex(tree, EntryComparator.instance(), key); + + if (idx < 0) idx = -2 - idx; + else if (inclusiveEnds) --idx; + + return idx < 0 || idx >= size() + ? null + : valueAt(idx); + } + + public V foldl(BiFunction<V, V, V> reduce) + { + checkState(!isEmpty()); + Iterator<V> iter = valuesIterator(false); + V result = iter.next(); + while (iter.hasNext()) + { + V next = iter.next(); + if (next != null) + result = reduce.apply(result, next); + } + return result; + } + + public <V2> V2 foldl(BiFunction<V, V2, V2> reduce, V2 accumulator, Predicate<V2> terminate) + { + Iterator<V> iter = valuesIterator(true); + while (iter.hasNext()) + { + accumulator = reduce.apply(iter.next(), accumulator); + if (terminate.test(accumulator)) + break; + } + return accumulator; + } + + public <V2> V2 foldlWithBounds(QuadFunction<V, V2, K, K, V2> fold, V2 accumulator, Predicate<V2> terminate) + { + WithBoundsIterator<K, V> iter = withBoundsIterator(true); + while (iter.advance()) + { + accumulator = fold.apply(iter.value(), accumulator, iter.start(), iter.end()); + if (terminate.test(accumulator)) + break; + } + return accumulator; + } + + @Override + public String toString() + { + return toString(v -> true); + } + + public String toString(Predicate<V> include) + { + if (isEmpty()) + return "{}"; + + StringBuilder builder = new StringBuilder("{"); + boolean isFirst = true; + + WithBoundsIterator<K, V> iter = withBoundsIterator(); + while (iter.advance()) + { + if (!include.test(iter.value())) + continue; + + if (!isFirst) + builder.append(", "); + + builder.append(inclusiveStarts() ? '[' : '(') + .append(iter.start()) + .append(',') + .append(iter.end()) + .append(inclusiveEnds() ? ']' : ')') + .append('=') + .append(iter.value()); + + isFirst = false; + } + return builder.append('}').toString(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + @SuppressWarnings("unchecked") + BTreeReducingIntervalMap<K, V> that = (BTreeReducingIntervalMap<K, V>) o; + return this.inclusiveEnds == that.inclusiveEnds && BTree.equals(this.tree, that.tree); + } + + @Override + public int hashCode() + { + return Boolean.hashCode(inclusiveEnds) + 31 * BTree.hashCode(tree); + } + + public static class EntryComparator<K extends Comparable<? super K>, V> implements AsymmetricComparator<K, Entry<K, V>> + { + private static final EntryComparator<?,?> INSTANCE = new EntryComparator<>(); + + @SuppressWarnings("unchecked") + public static <K extends Comparable<? super K>, V> EntryComparator<K, V> instance() + { + return (EntryComparator<K, V>) INSTANCE; + } + + @Override + public int compare(K start, Entry<K, V> entry) + { + return start.compareTo(entry.start); + } + } + + public static class Entry<K extends Comparable<? super K>, V> implements Comparable<Entry<K, V>> + { + private final K start; + private final V value; + private final boolean hasValue; + + private Entry(K start, V value, boolean hasValue) + { + this.start = start; + this.value = value; + this.hasValue = hasValue; + } + + public static <K extends Comparable<? super K>, V> Entry<K, V> make(K start) + { + return new Entry<>(start, null, false); + } + + public static <K extends Comparable<? super K>, V> Entry<K, V> make(K start, V value) + { + return new Entry<>(start, value, true); + } + + public K start() + { + return start; + } + + public V value() + { + if (hasValue) return value; + throw illegalState(); + } + + /** null is considered a valid value, distinct from not having a set value at all */ + public boolean hasValue() + { + return hasValue; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + @SuppressWarnings("unchecked") + Entry<K, V> that = (Entry<K, V>) o; + return Objects.equals(this.start, that.start) && this.hasValue == that.hasValue && Objects.equals(this.value, that.value); + } + + @Override + public int hashCode() + { + return Objects.hashCode(start) + 31 * (Boolean.hashCode(hasValue) + 31 * Objects.hashCode(value)); + } + + @Override + public int compareTo(@Nonnull Entry<K, V> that) + { + return this.start.compareTo(that.start); + } + } + + protected static <K extends Comparable<? super K>, V, M extends BTreeReducingIntervalMap<K, V>> M mergeIntervals( + M historyLeft, M historyRight, IntervalBuilderFactory<K, V, M> factory) + { + if (historyLeft == null || historyLeft.isEmpty()) + return historyRight; + if (historyRight == null || historyRight.isEmpty()) + return historyLeft; + + boolean inclusiveEnds = inclusiveEnds(historyLeft.inclusiveEnds, !historyLeft.isEmpty(), historyRight.inclusiveEnds, !historyRight.isEmpty()); + AbstractIntervalBuilder<K, V, M> builder = factory.create(inclusiveEnds, historyLeft.size() + historyRight.size()); + + WithBoundsIterator<K, V> left = historyLeft.withBoundsIterator(); + WithBoundsIterator<K, V> right = historyRight.withBoundsIterator(); + + left.advance(); + right.advance(); + + K start; + { + // first loop over any range only covered by one of the two + WithBoundsIterator<K, V> first = left.start().compareTo(right.start()) <= 0 ? left : right; + WithBoundsIterator<K, V> second = first == left ? right : left; + + while (first.hasCurrent() && first.end().compareTo(second.start()) <= 0) + { + if (first.value() != null) + builder.append(first.start(), first.end(), first.value()); + first.advance(); + } + + start = second.start(); + if (first.hasCurrent() && first.start().compareTo(start) < 0 && first.value() != null) + builder.append(first.start(), start, builder.slice(first.start(), start, first.value())); + checkState(start.compareTo(second.start()) <= 0); + } + + // loop over any range covered by both + while (left.hasCurrent() && right.hasCurrent()) + { + int cmp = left.end().compareTo(right.end()); + K end = (cmp <= 0 ? left : right).end(); + V value = sliceAndReduce(start, end, left.value(), right.value(), builder); + if (cmp <= 0) left.advance(); + if (cmp >= 0) right.advance(); + if (value != null) + builder.append(start, end, value); + start = end; + } + + // finally loop over any remaining range covered by only one + WithBoundsIterator<K, V> remaining = left.hasCurrent() ? left : right; + if (remaining.hasCurrent()) + { + { // only slice the first one + K end = remaining.end(); + V value = remaining.value(); + if (value != null) + { + value = builder.slice(start, end, value); + builder.append(start, end, value); + } + start = end; + remaining.advance(); + } + while (remaining.hasCurrent()) + { + K end = remaining.end(); + V value = remaining.value(); + if (value != null) + builder.append(start, end, value); + start = end; + remaining.advance(); + } + } + + return builder.build(); + } + + protected static <K extends Comparable<? super K>, V, M extends BTreeReducingIntervalMap<K, V>> M merge( + M historyLeft, M historyRight, BiFunction<V, V, V> reduce, BoundariesBuilderFactory<K, V, M> factory) + { + if (historyLeft == null || historyLeft.isEmpty()) + return historyRight; + if (historyRight == null || historyRight.isEmpty()) + return historyLeft; + + boolean inclusiveEnds = inclusiveEnds(historyLeft.inclusiveEnds, !historyLeft.isEmpty(), historyRight.inclusiveEnds, !historyRight.isEmpty()); + AbstractBoundariesBuilder<K, V, M> builder = factory.create(inclusiveEnds, historyLeft.size() + historyRight.size()); + + WithBoundsIterator<K, V> left = historyLeft.withBoundsIterator(); + WithBoundsIterator<K, V> right = historyRight.withBoundsIterator(); + + left.advance(); + right.advance(); + + K start; + { + // first loop over any range only covered by one of the two + WithBoundsIterator<K, V> first = left.start().compareTo(right.start()) <= 0 ? left : right; + WithBoundsIterator<K, V> second = first == left ? right : left; + + K end = null; + while (first.hasCurrent() && first.end().compareTo(second.start()) <= 0) + { + builder.append(first.start(), first.value(), reduce); + end = first.end(); + first.advance(); + } + + start = second.start(); + if (first.hasCurrent()) + builder.append(first.start(), first.value(), reduce); + else + builder.append(end, null, reduce); + checkState(start.compareTo(second.start()) <= 0); + } + + // loop over any range covered by both + // TODO (expected): optimise merging of very different sized maps (i.e. for inserts) + while (left.hasCurrent() && right.hasCurrent()) + { + int cmp = left.end().compareTo(right.end()); + V value = reduce(left.value(), right.value(), reduce); + + if (cmp == 0) + { + builder.append(start, value, reduce); + start = left.end(); + left.advance(); + right.advance(); + } + else if (cmp < 0) + { + builder.append(start, value, reduce); + start = left.end(); + left.advance(); + } + else + { + builder.append(start, value, reduce); + start = right.end(); + right.advance(); + } + } + + // finally loop over any remaining range covered by only one + while (left.hasCurrent()) + { + builder.append(start, left.value(), reduce); + start = left.end(); + left.advance(); + } + + while (right.hasCurrent()) + { + builder.append(start, right.value(), reduce); + start = right.end(); + right.advance(); + } + + builder.append(start, null, reduce); + + return builder.build(); + } + + static boolean inclusiveEnds(boolean leftIsInclusive, boolean leftIsDecisive, boolean rightIsInclusive, boolean rightIsDecisive) + { + if (leftIsInclusive == rightIsInclusive) + return leftIsInclusive; + + if (leftIsDecisive && rightIsDecisive) + throw illegalState("Mismatching bound inclusivity/exclusivity"); + + return leftIsDecisive ? leftIsInclusive : rightIsInclusive; + } + + private static <V> V reduce(V left, V right, BiFunction<V, V, V> reduce) + { + return left == null ? right : right == null ? left : reduce.apply(left, right); + } + + private static <K extends Comparable<? super K>, V> V sliceAndReduce(K start, K end, V left, V right, AbstractIntervalBuilder<K, V, ?> builder) + { + if (left != null) left = builder.slice(start, end, left); + if (right != null) right = builder.slice(start, end, right); + return left == null ? right : right == null ? left : builder.reduce(left, right); + } + + public interface BoundariesBuilderFactory<K extends Comparable<? super K>, V, M extends BTreeReducingIntervalMap<K, V>> + { + AbstractBoundariesBuilder<K, V, M> create(boolean inclusiveEnds, int capacity); + } + + public static abstract class AbstractBoundariesBuilder<K extends Comparable<? super K>, V, M extends BTreeReducingIntervalMap<K, V>> + { + protected final boolean inclusiveEnds; + + private final BTree.Builder<Entry<K, V>> treeBuilder; + private final TinyKVBuffer<K, V> buffer; + + protected AbstractBoundariesBuilder(boolean inclusiveEnds, int capacity) + { + this.inclusiveEnds = inclusiveEnds; + this.treeBuilder = BTree.builder(Comparator.naturalOrder(), capacity); + this.buffer = new TinyKVBuffer<>(); + } + + public boolean isEmpty() + { + return buffer.isEmpty(); + } + + /** + * null is a valid value to represent no knowledge, and is the *expected* final value, representing + * the bound of our knowledge (any higher key will find no associated information) + */ + public void append(K start, @Nullable V value, BiFunction<V, V, V> reduce) + { + if (buffer.isEmpty()) + { + buffer.append(start, value); + return; + } + + K prevStart = buffer.lastKey(); + V prevValue = buffer.lastValue(); + + checkArgument(start.compareTo(prevStart) >= 0); + + boolean samePrevStart = start.equals(prevStart); + boolean samePrevValue = Objects.equals(value, prevValue); + + if (!(samePrevStart || samePrevValue)) + { + if (buffer.isFull()) + { + treeBuilder.add(Entry.make(buffer.firstKey(), buffer.firstValue())); + buffer.dropFirst(); + } + buffer.append(start, value); + return; + } + + if (samePrevValue) + return; + + if (prevValue != null) + buffer.lastValue(reduce.apply(prevValue, value)); + else if (buffer.size() >= 2 && value.equals(buffer.penultimateValue())) + buffer.dropLast(); // just remove the last start and (null) value + else + buffer.lastValue(value); + } + + protected abstract M buildInternal(Object[] tree); + + public final M build() + { + while (buffer.size() > 1) + { + treeBuilder.add(Entry.make(buffer.firstKey(), buffer.firstValue())); + buffer.dropFirst(); + } + + if (!buffer.isEmpty()) + { + checkState(buffer.lastValue() == null); + treeBuilder.add(Entry.make(buffer.firstKey())); + buffer.dropFirst(); + } + + return buildInternal(treeBuilder.build()); + } + } + + public interface IntervalBuilderFactory<K extends Comparable<? super K>, V, M extends BTreeReducingIntervalMap<K, V>> + { + AbstractIntervalBuilder<K, V, M> create(boolean inclusiveEnds, int capacity); + } + + public static abstract class AbstractIntervalBuilder<K extends Comparable<? super K>, V, M> + { + protected final boolean inclusiveEnds; + private final BTree.Builder<Entry<K, V>> treeBuilder; + private final TinyKVBuffer<K, V> buffer; + + private K prevEnd; + + protected AbstractIntervalBuilder(boolean inclusiveEnds, int capacity) + { + this.inclusiveEnds = inclusiveEnds; + this.treeBuilder = BTree.builder(Comparator.naturalOrder(), capacity); + this.buffer = new TinyKVBuffer<>(); + } + + protected abstract V slice(K start, K end, V value); + protected abstract V reduce(V a, V b); + + protected V tryMergeEqual(@Nonnull V a, V b) + { + return a.equals(b) ? a : null; + } + + public void append(K start, K end, @Nonnull V value) + { + if (prevEnd != null) + { + int c = prevEnd.compareTo(start); + checkArgument(c <= 0); + if (c < 0) + { + if (buffer.isFull()) + { + treeBuilder.add(Entry.make(buffer.firstKey(), buffer.firstValue())); + buffer.dropFirst(); + } + buffer.append(prevEnd, null); + } + } + + V prevValue; + if (!buffer.isEmpty() && null != (prevValue = buffer.lastValue()) && null != (prevValue = tryMergeEqual(prevValue, value))) + { + buffer.lastValue(prevValue); + } + else + { + if (buffer.isFull()) + { + treeBuilder.add(Entry.make(buffer.firstKey(), buffer.firstValue())); + buffer.dropFirst(); + } + buffer.append(start, value); + } + + prevEnd = end; + } + + protected abstract M buildInternal(Object[] tree); + + public final M build() + { + while (!buffer.isEmpty()) + { + treeBuilder.add(Entry.make(buffer.firstKey(), buffer.firstValue())); + buffer.dropFirst(); + } + + if (prevEnd != null) + { + treeBuilder.add(Entry.make(prevEnd)); + prevEnd = null; + } + + return buildInternal(treeBuilder.build()); + } + } + + protected Entry<K, V> entryAt(int idx) + { + return BTree.findByIndex(tree, idx); + } + + protected K startAt(int idx) + { + return entryAt(idx).start(); + } + + protected V valueAt(int idx) + { + return entryAt(idx).value(); + } + + protected Iterator<Entry<K, V>> entriesIterator() + { + return BTree.iterator(tree); + } + + protected Iterator<V> valuesIterator(boolean skipNullValues) + { + return transform(filter(entriesIterator(), + skipNullValues ? e -> e.hasValue() && e.value() != null : Entry::hasValue), + Entry::value); + } + + protected void forEachWithBounds(boolean skipNullValues, TriConsumer<K, K, V> consumer) + { + if (isEmpty()) + return; + WithBoundsIterator<K, V> iter = withBoundsIterator(skipNullValues); + while (iter.advance()) + consumer.accept(iter.start(), iter.end(), iter.value()); + } + + public WithBoundsIterator<K, V> withBoundsIterator() + { + return new WithBoundsIterator<>(false, entriesIterator()); + } + + public WithBoundsIterator<K, V> withBoundsIterator(boolean skipNullValues) + { + return new WithBoundsIterator<>(skipNullValues, entriesIterator()); + } + + public <M> Iterator<M> iterator(boolean skipNullValues, TriFunction<K, K, V, M> mapper) + { + return new WithBoundsIterator<>(skipNullValues, entriesIterator()).map(mapper); + } + + public <M> Stream<M> stream(boolean skipNullValues, TriFunction<K, K, V, M> mapper) + { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(skipNullValues, mapper), 0), false); + } + + private enum IteratorState { START, ADVANCED, END } + + public static class WithBoundsIterator<K extends Comparable<? super K>, V> + { + private final boolean skipNullValues; + private final Iterator<Entry<K, V>> iterator; + + private Entry<K, V> curr, next; + private IteratorState state = IteratorState.START; + + private WithBoundsIterator(boolean skipNullValues, Iterator<Entry<K, V>> entriesIterator) + { + this.skipNullValues = skipNullValues; + this.iterator = entriesIterator; + } + + public boolean hasCurrent() + { + return state == IteratorState.ADVANCED; + } + + public K start() + { + return curr.start(); + } + + public K end() + { + return next.start(); + } + + public V value() + { + return curr.value(); + } + + public boolean advance() + { + if (!skipNullValues) + return advanceInternal(); + + //noinspection StatementWithEmptyBody + while (advanceInternal() && value() == null); + return hasCurrent(); + } + + private boolean advanceInternal() + { + switch (state) + { + case START: + if (iterator.hasNext()) + { + curr = iterator.next(); + checkState(iterator.hasNext()); // must contain at least two entries + next = iterator.next(); + state = IteratorState.ADVANCED; + } + else + { + state = IteratorState.END; + } + break; + case ADVANCED: + if (next.hasValue()) + { + checkState(iterator.hasNext()); // must be at least one entry after next if it has a value + curr = next; + next = iterator.next(); + state = IteratorState.ADVANCED; + } + else + { + checkState(!iterator.hasNext()); // should have no more entries if next has no value + curr = next = null; + state = IteratorState.END; + } + break; + } + + return state == IteratorState.ADVANCED; + } + + private <M> Iterator<M> map(TriFunction<K, K, V, M> mapper) + { + return new AbstractIterator<>() + { + @Override + protected M computeNext() + { + return advance() ? mapper.apply(start(), end(), value()) : endOfData(); + } + }; + } + } +} diff --git a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java new file mode 100644 index 00000000..a623dfbc --- /dev/null +++ b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.utils; + +import accord.api.RoutingKey; +import accord.primitives.*; +import accord.utils.btree.BTree; +import accord.utils.btree.BTreeRemoval; +import accord.utils.btree.UpdateFunction; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Predicate; + +import static accord.utils.Invariants.checkArgument; +import static accord.utils.SortedArrays.Search.FAST; + +public class BTreeReducingRangeMap<V> extends BTreeReducingIntervalMap<RoutingKey, V> +{ + public BTreeReducingRangeMap() + { + super(); + } + + protected BTreeReducingRangeMap(boolean inclusiveEnds, Object[] tree) + { + super(inclusiveEnds, tree); + } + + public V foldl(Routables<?> routables, BiFunction<V, V, V> fold, V accumulator) + { + return foldl(routables, fold, accumulator, ignore -> false); + } + + public <V2> V2 foldl(Routables<?> routables, BiFunction<V, V2, V2> fold, V2 accumulator, Predicate<V2> terminate) + { + return foldl(routables, (a, b, f, ignore) -> f.apply(a, b), accumulator, fold, null, terminate); + } + + public <V2, P1, P2> V2 foldl(Routables<?> routables, QuadFunction<V, V2, P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate) + { + switch (routables.domain()) + { + default: throw new AssertionError("Unknown domain: " + routables.domain()); + case Key: return foldl((AbstractKeys<?>) routables, fold, accumulator, p1, p2, terminate); + case Range: return foldl((AbstractRanges) routables, fold, accumulator, p1, p2, terminate); + } + } + + public <V2, P1, P2> V2 foldl(AbstractKeys<?> keys, QuadFunction<V, V2, P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate) + { + if (isEmpty()) + return accumulator; + + int treeSize = BTree.size(tree), + keysSize = keys.size(); + + int i = keys.find(startAt(0), FAST); + if (i < 0) i = -1 - i; + else if (inclusiveEnds) ++i; + + while (i < keysSize) + { + int idx = findIndex(keys.get(i)); + if (idx < 0) idx = -2 - idx; + else if (inclusiveEnds) --idx; + + if (idx >= treeSize - 1) + return accumulator; + + int nexti = keys.findNext(i, startAt(idx + 1), FAST); + if (nexti < 0) nexti = -1 -nexti; + else if (inclusiveEnds) ++nexti; + + Entry<RoutingKey, V> entry = entryAt(idx); + if (i != nexti && entry.hasValue() && entry.value() != null) + { + accumulator = fold.apply(entry.value(), accumulator, p1, p2); + if (terminate.test(accumulator)) + return accumulator; + } + i = nexti; + } + return accumulator; + } + + public <V2, P1, P2> V2 foldl(AbstractRanges ranges, QuadFunction<V, V2, P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate) + { + if (isEmpty()) + return accumulator; + + int treeSize = BTree.size(tree), + rangesSize = ranges.size(); + + RoutingKey start = startAt(0); + int i = ranges.find(start, FAST); + if (i < 0) i = -1 - i; + else if (inclusiveEnds && ranges.get(i).end().equals(start)) ++i; + + while (i < rangesSize) + { + Range range = ranges.get(i++); + + int startIdx = findIndex(range.start()); + int startPos = startIdx < 0 ? (-1 - startIdx) : startIdx; + if (startIdx == treeSize - 1 || startPos == treeSize) + return accumulator; // is last or out of bounds -> we are done + if (startIdx < 0) startPos = Math.max(0, startPos - 1); // inclusive + + int endIdx = findIndex(range.end()); + int endPos = endIdx < 0 ? (-1 - endIdx) : endIdx; + if (endPos == 0) + continue; // is first or out of bounds -> continue + endPos = Math.min(endPos - 1, treeSize - 2); // inclusive + + Iterator<Entry<RoutingKey,V>> iterator = + BTree.iterator(tree, startPos, endPos, BTree.Dir.ASC); + + while (iterator.hasNext()) + { + Entry<RoutingKey, V> entry = iterator.next(); + if (entry.hasValue() && entry.value() != null) + { + accumulator = fold.apply(entry.value(), accumulator, p1, p2); + if (terminate.test(accumulator)) + return accumulator; + } + } + + if (endPos >= treeSize - 2) + return accumulator; + + RoutingKey nextStart = startAt(endPos + 1); + i = ranges.findNext(i, nextStart, FAST); + if (i < 0) i = -1 - i; + else if (inclusiveEnds && ranges.get(i).end().equals(nextStart)) ++i; + } + + return accumulator; + } + + public int findIndex(RoutableKey key) + { + return BTree.findIndex(tree, EntryComparator.instance(), key); + } + + public static <V> BTreeReducingRangeMap<V> create(AbstractRanges ranges, V value) + { + checkArgument(value != null, "value is null"); + + if (ranges.isEmpty()) + return new BTreeReducingRangeMap<>(); + + return create(ranges, value, BTreeReducingRangeMap.Builder::new); + } + + public static <V, M extends BTreeReducingRangeMap<V>> M create(Unseekables<?> keysOrRanges, V value, BoundariesBuilderFactory<RoutingKey, V, M> builder) + { + switch (keysOrRanges.domain()) + { + default: throw new AssertionError("Unhandled domain: " + keysOrRanges.domain()); + case Range: return create((AbstractRanges) keysOrRanges, value, builder); + case Key: return create((AbstractUnseekableKeys) keysOrRanges, value, builder); + } + } + + public static <V, M extends BTreeReducingRangeMap<V>> M create(Seekables<?, ?> keysOrRanges, V value, BoundariesBuilderFactory<RoutingKey, V, M> builder) + { + switch (keysOrRanges.domain()) + { + default: throw new AssertionError("Unhandled domain: " + keysOrRanges.domain()); + case Range: return create((AbstractRanges) keysOrRanges, value, builder); + case Key: return create((Keys) keysOrRanges, value, builder); + } + } + + public static <V, M extends BTreeReducingRangeMap<V>> M create(AbstractRanges ranges, V value, BoundariesBuilderFactory<RoutingKey, V, M> factory) + { + checkArgument(value != null, "value is null"); + + AbstractBoundariesBuilder<RoutingKey, V, M> builder = factory.create(ranges.get(0).endInclusive(), ranges.size() * 2); + for (Range cur : ranges) + { + builder.append(cur.start(), value, (a, b) -> { throw new IllegalStateException(); }); + builder.append(cur.end(), null, (a, b) -> { throw new IllegalStateException(); }); + } + + return builder.build(); + } + + public static <V, M extends BTreeReducingRangeMap<V>> M create(AbstractUnseekableKeys keys, V value, BoundariesBuilderFactory<RoutingKey, V, M> factory) + { + checkArgument(value != null, "value is null"); + + AbstractBoundariesBuilder<RoutingKey, V, M> builder = factory.create(keys.get(0).asRange().endInclusive(), keys.size() * 2); + for (int i = 0 ; i < keys.size() ; ++i) + { + Range range = keys.get(i).asRange(); + builder.append(range.start(), value, (a, b) -> { throw new IllegalStateException(); }); + builder.append(range.end(), null, (a, b) -> { throw new IllegalStateException(); }); + } + + return builder.build(); + } + + public static <V, M extends BTreeReducingRangeMap<V>> M create(Keys keys, V value, BoundariesBuilderFactory<RoutingKey, V, M> factory) + { + checkArgument(value != null, "value is null"); + + RoutingKey prev = keys.get(0).toUnseekable(); + AbstractBoundariesBuilder<RoutingKey, V, M> builder; + { + Range range = prev.asRange(); + builder = factory.create(prev.asRange().endInclusive(), keys.size() * 2); + builder.append(range.start(), value, (a, b) -> { throw new IllegalStateException(); }); + builder.append(range.end(), null, (a, b) -> { throw new IllegalStateException(); }); + } + + for (int i = 1 ; i < keys.size() ; ++i) + { + RoutingKey unseekable = keys.get(i).toUnseekable(); + if (unseekable.equals(prev)) + continue; + + Range range = unseekable.asRange(); + builder.append(range.start(), value, (a, b) -> { throw new IllegalStateException(); }); + builder.append(range.end(), null, (a, b) -> { throw new IllegalStateException(); }); + prev = unseekable; + } + + return builder.build(); + } + + public static <V> BTreeReducingRangeMap<V> add(BTreeReducingRangeMap<V> existing, Ranges ranges, V value, BiFunction<V, V, V> reduce) + { + return update(existing, ranges, value, reduce, BTreeReducingRangeMap::new); + } + + public static BTreeReducingRangeMap<Timestamp> add(BTreeReducingRangeMap<Timestamp> existing, Ranges ranges, Timestamp value) + { + return add(existing, ranges, value, Timestamp::max); + } + + public static <V> BTreeReducingRangeMap<V> merge(BTreeReducingRangeMap<V> historyLeft, BTreeReducingRangeMap<V> historyRight, BiFunction<V, V, V> reduce) + { + return BTreeReducingIntervalMap.merge(historyLeft, historyRight, reduce, BTreeReducingRangeMap.Builder::new); + } + + /** + * Update the range map retaining as much of the underlying BTree as possible + */ + public static <M extends BTreeReducingRangeMap<V>, V> M update( + M map, Seekables<?, ?> keysOrRanges, V value, BiFunction<V, V, V> valueResolver, + BiFunction<Boolean, Object[], M> factory, BoundariesBuilderFactory<RoutingKey, V, M> builderFactory) + { + if (keysOrRanges.isEmpty()) + return map; + + if (map.isEmpty()) + return create(keysOrRanges, value, builderFactory); + + if (map.inclusiveEnds() != keysOrRanges.get(0).toUnseekable().asRange().endInclusive()) + throw new IllegalStateException("Mismatching bound inclusivity/exclusivity - can't be updated"); + + return update(map, keysOrRanges, value, valueResolver, factory); + } + + private static <M extends BTreeReducingRangeMap<V>, V> M update( + M map, Seekables<?,?> keysOrRanges, V value, BiFunction<V, V, V> valueResolver, BiFunction<Boolean, Object[], M> factory) + { + Accumulator<V> acc = accumulator(); + + int treeSize = BTree.size(map.tree); + boolean updatedEndEntry = false; + + Range thisRange, nextRange = null; + for (int i = 0, rangesSize = keysOrRanges.size(); i < rangesSize; ++i) + { + thisRange = i == 0 ? keysOrRanges.get(i).toUnseekable().asRange() : nextRange; + nextRange = i < rangesSize - 1 ? keysOrRanges.get(i + 1).toUnseekable().asRange() : null; + + assert thisRange != null; + + int startIdx = map.findIndex(thisRange.start()); + int endIdx = map.findIndex(thisRange.end()); + + int startIns = startIdx >= 0 ? startIdx : -1 - startIdx; + int endIns = endIdx >= 0 ? endIdx : -1 - endIdx; + + boolean isRangeOpen = false; + + if (startIdx < 0) // if we start on an existing bound, we don't have to do anything *here* + { + if (startIns == 0) // insert before first map entry + { + acc.add(thisRange.start(), value); + isRangeOpen = true; + } + else if (startIns == treeSize) // inserting past last map entry + { + // when adding past current end, need to update the very last entry from having no value to having value = null + if (!updatedEndEntry) + { + acc.add(map.startAt(treeSize - 1), null); + updatedEndEntry = true; + } + acc.add(thisRange.start(), value); + isRangeOpen = true; + } + else // start within the current map bounds + { + // split the range we start in if our value is higher than the range's + if (supersedes(map.entryAt(startIns - 1), value, valueResolver)) + { + acc.add(thisRange.start(), value); + isRangeOpen = true; + } + } + } + + if (startIns < endIns) + { + // start and end are inclusive with BTree iterator + Iterator<Entry<RoutingKey, V>> iter = BTree.iterator(map.tree, startIns, endIns - 1, BTree.Dir.ASC); + while (iter.hasNext()) + { + Entry<RoutingKey, V> entry = iter.next(); + boolean supersedes = supersedes(entry, value, valueResolver); + if (supersedes) + { + if (isRangeOpen) + acc.remove(entry.start()); + else + acc.add(entry.start(), value); + } + isRangeOpen = supersedes; + } + } + + if (endIdx < 0) // if we end on an existing bound, we don't have to do anything + { + if (endIns == 0) // range ends before first entry in the map + { + acc.add(thisRange.end(), null); + } + else if (endIns == treeSize) // range ends after last entry in the map + { + if (nextRange == null) + acc.add(thisRange.end()); + else + acc.add(thisRange.end(), null); + + updatedEndEntry = true; + } + else // ends between two existing map bounds + { + // split the range we end in if our value is higher than the range's + if (isRangeOpen) acc.add(thisRange.end(), map.valueAt(endIns - 1)); + } + } + } + + Object[] updated = acc.apply(map.tree); acc.reuse(); + return map.tree == updated + ? map + : factory.apply(map.inclusiveEnds(), updated); + } + + private static final ThreadLocal<Accumulator<?>> accumulator = ThreadLocal.withInitial(Accumulator::new); + + @SuppressWarnings("unchecked") + private static <V> Accumulator<V> accumulator() + { + return (Accumulator<V>) accumulator.get().reuse(); + } + + private static class Accumulator<V> implements BTree.Builder.QuickResolver<Entry<RoutingKey, V>> + { + private boolean isClean = true; + + BTree.Builder<Entry<RoutingKey, V>> toAdd; + List<RoutingKey> toRemove; + + void remove(RoutingKey key) + { + isClean = false; + if (toRemove == null) + toRemove = new ArrayList<>(); + toRemove.add(key); + } + + void add(RoutingKey key) + { + isClean = false; + toAdd().add(Entry.make(key)); + } + + void add(RoutingKey key, V value) + { + isClean = false; + toAdd().add(Entry.make(key, value)); + } + + private BTree.Builder<Entry<RoutingKey, V>> toAdd() + { + if (toAdd == null) + { + toAdd = BTree.builder(Comparator.naturalOrder()); + toAdd.setQuickResolver(this); + } + return toAdd; + } + + Object[] apply(Object[] tree) + { + if (toRemove != null) + for (RoutingKey key : toRemove) + tree = BTreeRemoval.remove(tree, EntryComparator.instance(), key); + + if (toAdd != null && !toAdd.isEmpty()) + tree = BTree.update(tree, toAdd.build(), Comparator.<Entry<RoutingKey, V>>naturalOrder(), UpdateFunction.noOpReplace()); + + return tree; + } + + @Override + public Entry<RoutingKey, V> resolve(Entry<RoutingKey, V> left, Entry<RoutingKey, V> right) + { + if (left.hasValue() != right.hasValue()) + return left.hasValue() ? left : right; + + if (!left.hasValue()) + return left; + + if ((left.value() == null) != (right.value() == null)) + return left.value() == null ? right : left; + + return right; + } + + Accumulator<V> reuse() + { + if (!isClean) + { + if (toRemove != null) toRemove.clear(); + if (toAdd != null) toAdd.reuse(); + isClean = true; + } + return this; + } + } + + private static <V> boolean supersedes(Entry<RoutingKey, V> entry, V value, BiFunction<V, V, V> resolver) + { + return !entry.hasValue() + || (entry.value() == null && value != null) + || (entry.value() != null && value != null && !resolver.apply(entry.value(), value).equals(entry.value())); + } + + static class Builder<V> extends AbstractBoundariesBuilder<RoutingKey, V, BTreeReducingRangeMap<V>> + { + protected Builder(boolean inclusiveEnds, int capacity) + { + super(inclusiveEnds, capacity); + } + + @Override + protected BTreeReducingRangeMap<V> buildInternal(Object[] tree) + { + return new BTreeReducingRangeMap<>(inclusiveEnds, tree); + } + } + + /** + * A non-validating builder that expects all entries to be in correct order. For implementations' ser/de logic. + */ + public static class RawBuilder<V, M> + { + protected final boolean inclusiveEnds; + protected final int capacity; + + private BTree.Builder<Entry<RoutingKey, V>> treeBuilder; + private boolean lastStartAppended; + + public RawBuilder(boolean inclusiveEnds, int capacity) + { + this.inclusiveEnds = inclusiveEnds; + this.capacity = capacity; + } + + public RawBuilder<V, M> append(RoutingKey start, V value) + { + return append(Entry.make(start, value)); + } + + public RawBuilder<V, M> append(RoutingKey start) + { + return append(Entry.make(start)); + } + + public RawBuilder<V, M> append(Entry<RoutingKey, V> entry) + { + Invariants.checkState(!lastStartAppended); + if (treeBuilder == null) + (treeBuilder = BTree.builder(Comparator.naturalOrder(), capacity + 1)).auto(false); + treeBuilder.add(entry); + lastStartAppended = !entry.hasValue(); + return this; + } + + public final M build(BiFunction<Boolean, Object[], M> constructor) + { + Invariants.checkState(lastStartAppended || treeBuilder == null); + return constructor.apply(inclusiveEnds, treeBuilder == null ? BTree.empty() : treeBuilder.build()); + } + } +} diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java index 09a44017..2383469d 100644 --- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java @@ -127,12 +127,13 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> if (values.length == 0) return accumulator; - int i = 0, j = keys.findNext(0, starts[0], FAST); + int i = 0, j = keys.find(starts[0], FAST); if (j < 0) j = -1 - j; else if (inclusiveEnds) ++j; while (j < keys.size()) { + // TODO (desired): first search should be binarySearch i = exponentialSearch(starts, i, starts.length, keys.get(j)); if (i < 0) i = -2 - i; else if (inclusiveEnds) --i; @@ -161,26 +162,27 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> if (values.length == 0) return accumulator; - // TODO (desired): first searches should be binarySearch - int j = ranges.findNext(0, starts[0], FAST); + int j = ranges.find(starts[0], FAST); if (j < 0) j = -1 - j; else if (inclusiveEnds && ranges.get(j).end().equals(starts[0])) ++j; int i = 0; while (j < ranges.size()) { - Range range = ranges.get(j); - RoutingKey start = range.start(); - int nexti = exponentialSearch(starts, i, starts.length, start); + // TODO (desired): first search should be binarySearch + int nexti = exponentialSearch(starts, i, starts.length, ranges.get(j).start()); if (nexti < 0) i = Math.max(i, -2 - nexti); - else if (nexti > i && !inclusiveStarts()) i = nexti - 1; + else if (nexti > i && inclusiveEnds) i = nexti - 1; else i = nexti; if (i >= values.length) return accumulator; int toj, nextj = ranges.findNext(j, starts[i + 1], FAST); - if (nextj < 0) toj = nextj = -1 -nextj; + if (nextj < 0) + { + toj = nextj = -1 - nextj; + } else { toj = nextj + 1; @@ -215,7 +217,7 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> if (values.length == 0 || keys.isEmpty()) return fold.apply(defaultValue, accumulator, p1, p2, 0, keys.size()); - int i = 0, j = keys.findNext(0, starts[0], FAST); + int i = 0, j = keys.find(starts[0], FAST); if (j < 0) j = -1 - j; else if (inclusiveEnds) ++j; @@ -224,6 +226,7 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> while (j < keys.size()) { + // TODO (desired): first search should be binarySearch i = exponentialSearch(starts, i, starts.length, keys.get(j)); if (i < 0) i = -2 - i; else if (inclusiveEnds) --i; @@ -256,8 +259,7 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> if (values.length == 0 || ranges.isEmpty()) return fold.apply(defaultValue, accumulator, p1, p2, 0, ranges.size()); - // TODO (desired): first searches should be binarySearch - int j = ranges.findNext(0, starts[0], FAST); + int j = ranges.find(starts[0], FAST); if (j < 0) j = -1 - j; else if (inclusiveEnds && ranges.get(j).end().equals(starts[0])) ++j; @@ -267,18 +269,20 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> int i = 0; while (j < ranges.size()) { - Range range = ranges.get(j); - RoutingKey start = range.start(); - int nexti = exponentialSearch(starts, i, starts.length, start); + // TODO (desired): first search should be binarySearch + int nexti = exponentialSearch(starts, i, starts.length, ranges.get(j).start()); if (nexti < 0) i = Math.max(i, -2 - nexti); - else if (nexti > i && !inclusiveStarts()) i = nexti - 1; + else if (nexti > i && inclusiveEnds) i = nexti - 1; else i = nexti; if (i >= values.length) return fold.apply(defaultValue, accumulator, p1, p2, j, ranges.size()); int toj, nextj = ranges.findNext(j, starts[i + 1], FAST); - if (nextj < 0) toj = nextj = -1 -nextj; + if (nextj < 0) + { + toj = nextj = -1 -nextj; + } else { toj = nextj + 1; diff --git a/accord-core/src/main/java/accord/utils/TinyKVBuffer.java b/accord-core/src/main/java/accord/utils/TinyKVBuffer.java new file mode 100644 index 00000000..01d1957c --- /dev/null +++ b/accord-core/src/main/java/accord/utils/TinyKVBuffer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.utils; + +import static accord.utils.Invariants.checkState; +import static accord.utils.Invariants.illegalState; + +/** + * A 3-element buffer of key-value pairs + */ +class TinyKVBuffer<K, V> +{ + private K k0, k1, k2; + private V v0, v1, v2; + + void append(K key, V value) + { + if (k0 == null) { k0 = key; v0 = value; } + else if (k1 == null) { k1 = key; v1 = value; } + else if (k2 == null) { k2 = key; v2 = value; } + else illegalState(); + } + + void dropFirst() + { + checkState(k0 != null); + k0 = k1; v0 = v1; + k1 = k2; v1 = v2; + k2 = null; v2 = null; + } + + void dropLast() + { + if (k2 != null) { k2 = null; v2 = null; } + else if (k1 != null) { k1 = null; v1 = null; } + else if (k0 != null) { k0 = null; v0 = null; } + else illegalState(); + } + + int size() + { + if (k0 == null) return 0; + if (k1 == null) return 1; + if (k2 == null) return 2; + else return 3; + } + + boolean isFull() + { + return k2 != null; + } + + boolean isEmpty() + { + return k0 == null; + } + + K firstKey() + { + if (k0 != null) return k0; + throw illegalState(); + } + + V firstValue() + { + if (k0 != null) return v0; + throw illegalState(); + } + + V penultimateValue() + { + if (k2 != null) return v1; + if (k1 != null) return v0; + throw illegalState(); + } + + K lastKey() + { + if (k2 != null) return k2; + if (k1 != null) return k1; + if (k0 != null) return k0; + throw illegalState(); + } + + V lastValue() + { + if (k2 != null) return v2; + if (k1 != null) return v1; + if (k0 != null) return v0; + throw illegalState(); + } + + void lastValue(V value) + { + if (k2 != null) v2 = value; + else if (k1 != null) v1 = value; + else if (k0 != null) v0 = value; + else throw illegalState(); + } +} diff --git a/accord-core/src/main/java/accord/utils/TriConsumer.java b/accord-core/src/main/java/accord/utils/TriConsumer.java new file mode 100644 index 00000000..ba3826ee --- /dev/null +++ b/accord-core/src/main/java/accord/utils/TriConsumer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.utils; + +public interface TriConsumer<P1, P2, P3> +{ + void accept(P1 p1, P2 p2, P3 p3); +} diff --git a/accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java b/accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java new file mode 100644 index 00000000..3f5ff850 --- /dev/null +++ b/accord-core/src/test/java/accord/utils/BTreeReducingRangeMapTest.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +import accord.api.RoutingKey; +import accord.impl.IntKey; +import accord.local.Node; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; +import accord.primitives.Timestamp; +import accord.utils.BTreeReducingRangeMap.RawBuilder; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import javax.annotation.Nonnull; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import static java.lang.Integer.MAX_VALUE; +import static java.lang.Integer.MIN_VALUE; + +// TODO (desired): test start inclusive ranges +public class BTreeReducingRangeMapTest +{ + static final BTreeReducingRangeMap<Timestamp> EMPTY = new BTreeReducingRangeMap<>(); + static final RoutingKey MINIMUM_EXCL = new IntKey.Routing(MIN_VALUE); + static final RoutingKey MAXIMUM_EXCL = new IntKey.Routing(MAX_VALUE); + static boolean END_INCLUSIVE = false; + + private static RoutingKey rk(int t) + { + return new IntKey.Routing(t); + } + private static RoutingKey rk(Random random) + { + int rk = random.nextInt(); + if (random.nextBoolean()) rk = -rk; + if (rk == MAX_VALUE) --rk; + if (rk == MIN_VALUE) ++rk; + return new IntKey.Routing(rk); + } + + private static Timestamp none() + { + return null; + } + + private static Timestamp ts(int b) + { + return Timestamp.fromValues(1, b, 0, new Node.Id(1)); + } + + private static Range r(RoutingKey l, RoutingKey r) + { + return END_INCLUSIVE ? new Range.EndInclusive(l, r) : new Range.StartInclusive(l, r); + } + + private static RoutingKey incr(RoutingKey rk) + { + return new IntKey.Routing(((IntKey.Routing)rk).key + 1); + } + + private static RoutingKey decr(RoutingKey rk) + { + return new IntKey.Routing(((IntKey.Routing)rk).key - 1); + } + + private static Range r(int l, int r) + { + return r(rk(l), rk(r)); + } + + private static Pair<RoutingKey, Timestamp> pt(int t, int b) + { + return Pair.create(rk(t), ts(b)); + } + + private static Pair<RoutingKey, Timestamp> pt(int t, Timestamp b) + { + return Pair.create(rk(t), b); + } + + private static Pair<RoutingKey, Timestamp> pt(RoutingKey t, int b) + { + return Pair.create(t, ts(b)); + } + + private static BTreeReducingRangeMap<Timestamp> h(Pair<RoutingKey, Timestamp>... points) + { + Invariants.checkState(points[0].right == none()); + int length = points.length; + RawBuilder<Timestamp, BTreeReducingRangeMap<Timestamp>> builder = new RawBuilder<>(true, length - 1); + for (int i = 1 ; i < length ; ++i) + builder.append(points[i - 1].left, points[i].right); + builder.append(points[length - 1].left); + return builder.build(BTreeReducingRangeMap::new); + } + + static + { + assert rk(100).equals(rk(100)); + assert ts(111).equals(ts(111)); + } + + private static class Builder + { + BTreeReducingRangeMap<Timestamp> history = EMPTY; + + Builder add(Timestamp timestamp, Range... ranges) + { + history = BTreeReducingRangeMap.add(history, Ranges.of(ranges), timestamp); + return this; + } + + Builder clear() + { + history = EMPTY; + return this; + } + } + + static Builder builder() + { + return new Builder(); + } + + @Test + public void testOne() + { + testRandomAdds(8532037884171168001L, 3, 1, 3, 0.100000f, 0.100000f); + } + + @Test + public void testRandomAdds() throws ExecutionException, InterruptedException + { + ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List<ListenableFuture<Void>> results = new ArrayList<>(); + int count = 100000; + for (int numberOfAdditions : new int[] { 1, 10, 100 }) + { + for (float maxCoveragePerRange : new float[] { 0.01f, 0.1f, 0.5f }) + { + for (float chanceOfMinRoutingKey : new float[] { 0.01f, 0.1f }) + { + results.addAll(testRandomAdds(executor, count, 3, numberOfAdditions, 3, maxCoveragePerRange, chanceOfMinRoutingKey)); + } + } + } + Futures.allAsList(results).get(); + executor.shutdown(); + } + + private List<ListenableFuture<Void>> testRandomAdds(ExecutorService executor, int tests, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey) + { + return ThreadLocalRandom.current() + .longs(tests) + .mapToObj(seed -> { + SettableFuture<Void> promise = SettableFuture.create(); + executor.execute(() -> { + try + { + testRandomAdds(seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey); + promise.set(null); + } + catch (Throwable t) + { + promise.setException(t); + } + }); + return promise; + }) + .collect(Collectors.toList()); + } + + private void testRandomAdds(long seed, int numberOfMerges, int numberOfAdditions, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey) + { + String id = String.format("%d, %d, %d, %d, %f, %f", seed, numberOfMerges, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey); + try + { + Random random = new Random(seed); + List<RandomWithCanonical> merge = new ArrayList<>(); + while (numberOfMerges-- > 0) + { + RandomWithCanonical build = new RandomWithCanonical(); + build.addRandom(random, numberOfAdditions, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey); + build.validate(random, id); + merge.add(build); + } + + RandomWithCanonical check = new RandomWithCanonical(); + for (RandomWithCanonical add : merge) + check = check.merge(random, add); + // check.serdeser(); + + check.validate(random, id); + } + catch (Throwable t) + { + if (!(t instanceof AssertionFailedError)) + throw new RuntimeException(id, t); + } + } + + static class RandomMap + { + BTreeReducingRangeMap<Timestamp> test = new BTreeReducingRangeMap<>(); + + void add(Ranges ranges, Timestamp timestamp) + { + test = BTreeReducingRangeMap.add(test, ranges, timestamp); + } + + void merge(RandomMap other) + { + test = BTreeReducingRangeMap.merge(test, other.test, Timestamp::max); + } + + void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance) + { + int count = maxRangeCount == 1 ? 1 : 1 + random.nextInt(maxRangeCount - 1); + Timestamp timestamp = ts(random.nextInt(MAX_VALUE)); + List<Range> ranges = new ArrayList<>(); + while (count-- > 0) + { + int length = (int) (2 * random.nextDouble() * maxCoverage * MAX_VALUE); + if (length == 0) length = 1; + Range range; + if (random.nextFloat() <= minChance) + { + if (random.nextBoolean()) range = r(MIN_VALUE + 1, MIN_VALUE + 1 + length); + else range = r(MAX_VALUE - length - 1, MAX_VALUE - 1); + } + else + { + int start = random.nextInt(MAX_VALUE - length - 1); + range = r(start, start + length); + } + ranges.add(range); + } + add(Ranges.of(ranges.toArray(new Range[0])), timestamp); + } + + void addRandom(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerAddition, float minRoutingKeyChance) + { + while (count-- > 0) + addOneRandom(random, maxNumberOfRangesPerAddition, maxCoveragePerAddition, minRoutingKeyChance); + } + + + static BTreeReducingRangeMap<Timestamp> build(Random random, int count, int maxNumberOfRangesPerAddition, float maxCoveragePerRange, float chanceOfMinRoutingKey) + { + RandomMap result = new RandomMap(); + result.addRandom(random, count, maxNumberOfRangesPerAddition, maxCoveragePerRange, chanceOfMinRoutingKey); + return result.test; + } + } + + static class RandomWithCanonical extends RandomMap + { + // confusingly, we use lower bounds here since we copied over from C* + NavigableMap<RoutingKey, Timestamp> canonical = new TreeMap<>(); + { + canonical.put(MINIMUM_EXCL, none()); + canonical.put(MAXIMUM_EXCL, none()); + } + + Timestamp get(RoutingKey rk) + { + return canonical.ceilingEntry(rk).getValue(); + } + + RandomWithCanonical merge(Random random, RandomWithCanonical other) + { + RandomWithCanonical result = new RandomWithCanonical(); + result.test = random.nextBoolean() + ? BTreeReducingRangeMap.merge(test, other.test, Timestamp::max) + : BTreeReducingIntervalMap.mergeIntervals(test, other.test, IntervalBuilder::new); + result.canonical = new TreeMap<>(); + result.canonical.putAll(canonical); + RoutingKey prev = null; + for (Map.Entry<RoutingKey, Timestamp> entry : other.canonical.entrySet()) + { + if (prev != null) result.addCanonical(r(prev, entry.getKey()), entry.getValue()); + prev = entry.getKey(); + } + return result; + } + + static class IntervalBuilder extends BTreeReducingIntervalMap.AbstractIntervalBuilder<RoutingKey, Timestamp, BTreeReducingRangeMap<Timestamp>> + { + protected IntervalBuilder(boolean inclusiveEnds, int capacity) + { + super(inclusiveEnds, capacity); + } + + @Override + protected Timestamp slice(RoutingKey start, RoutingKey end, Timestamp value) + { + return value; + } + + @Override + protected Timestamp reduce(Timestamp a, Timestamp b) + { + return Timestamp.max(a, b); + } + + @Override + protected Timestamp tryMergeEqual(@Nonnull Timestamp a, Timestamp b) + { + return a; + } + + @Override + protected BTreeReducingRangeMap<Timestamp> buildInternal(Object[] tree) + { + return new BTreeReducingRangeMap<>(inclusiveEnds, tree); + } + } + +// void serdeser() +// { +// ReduceRangeMap<RoutingKey, Timestamp> tmp = ReduceRangeMap.fromTupleBufferList(test.toTupleBufferList()); +// Assertions.assertEquals(test, tmp); +// test = tmp; +// } + + @Override + void add(Ranges addRanges, Timestamp timestamp) + { + super.add(addRanges, timestamp); + for (Range range : addRanges) + addCanonical(range, timestamp); + } + + @Override + void addOneRandom(Random random, int maxRangeCount, float maxCoverage, float minChance) + { + super.addOneRandom(random, maxRangeCount, maxCoverage, minChance); +// validate(new Random(), ""); + } + + void addCanonical(Range range, Timestamp timestamp) + { + canonical.put(range.start(), canonical.ceilingEntry(range.start()).getValue()); + canonical.put(range.end(), canonical.ceilingEntry(range.end()).getValue()); + + canonical.subMap(range.start(), !END_INCLUSIVE, range.end(), END_INCLUSIVE) + .entrySet().forEach(e -> e.setValue(Timestamp.nonNullOrMax(e.getValue(), timestamp))); + } + + void validate(Random random, String id) + { + for (RoutingKey rk : canonical.keySet()) + { + Assertions.assertEquals(get(decr(rk)), test.get(decr(rk)), id); + Assertions.assertEquals(get(rk), test.get(rk), id); + Assertions.assertEquals(get(incr(rk)), test.get(incr(rk)), id); + } + + // check some random + { + int remaining = 1000; + while (remaining-- > 0) + { + RoutingKey routingKey = rk(random); + Assertions.assertEquals(get(routingKey), test.get(routingKey), id); + } + } + + // validate foldl + { + int remaining = 100; + while (remaining-- > 0) + { + int count = 1 + random.nextInt(20); + RoutingKeys keys; + Ranges ranges; + { + RoutingKey[] tmp = new RoutingKey[count]; + for (int i = 0 ; i < tmp.length ; ++i) + tmp[i] = rk(random); + keys = RoutingKeys.of(tmp); + Range[] tmp2 = new Range[(keys.size() + 1) / 2]; + int i = 0, c = 0; + if (keys.size() % 2 == 1 && random.nextBoolean()) + tmp2[c++] = r(MINIMUM_EXCL, keys.get(i++)); + while (i + 1 < keys.size()) + { + tmp2[c++] = r(keys.get(i), keys.get(i+1)); + i += 2; + } + if (i < keys.size()) + tmp2[c++] = r(keys.get(i++), MAXIMUM_EXCL); + ranges = Ranges.of(tmp2); + } + + List<Timestamp> foldl = test.foldl(keys, (timestamp, timestamps) -> { + if (timestamps.isEmpty() || !timestamps.get(timestamps.size() - 1).equals(timestamp)) + timestamps.add(timestamp); + return timestamps; + }, new ArrayList<>(), ignore -> false); + + List<Timestamp> canonFoldl = new ArrayList<>(); + for (RoutingKey key : keys) + { + Timestamp next = get(key); + if (next == null) + continue; + if (canonFoldl.isEmpty() || !canonFoldl.get(canonFoldl.size() - 1).equals(next)) + canonFoldl.add(next); + } + Assertions.assertEquals(canonFoldl, foldl, id); + + foldl = test.foldl(ranges, (timestamp, timestamps) -> { + if (timestamps.isEmpty() || !timestamps.get(timestamps.size() - 1).equals(timestamp)) + timestamps.add(timestamp); + return timestamps; + }, new ArrayList<>(), ignore -> false); + + canonFoldl.clear(); + for (Range range : ranges) + { + RoutingKey start = END_INCLUSIVE ? canonical.higherKey(range.start()) : canonical.ceilingKey(range.start()); + RoutingKey end = END_INCLUSIVE ? canonical.ceilingKey(range.end()) : canonical.higherKey(range.end()); + for (Timestamp next : canonical.subMap(start, true, end, true).values()) + { + if (next == null) + continue; + + if (canonFoldl.isEmpty() || !canonFoldl.get(canonFoldl.size() - 1).equals(next)) + canonFoldl.add(next); + } + } + Assertions.assertEquals(canonFoldl, foldl, id); + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org