This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 00fb6d7 Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times. 00fb6d7 is described below commit 00fb6d76d0a97af06ba27c1180d6dcddfa337fea Author: Francisco Fernandez Castano <francisco.fernandez.cast...@gmail.com> AuthorDate: Wed Mar 25 12:15:20 2020 +0100 Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times. patch by Francisco Fernandez; reviewed by Benjamin Lerer and Robert Stupp for CASSANDRA-14773 This patch: * prevents int32-bit integer overflow * simplifies the underlying structures of StreamingTombstoneHistogramBuilder * avoid humongous allocations by maintaining separate arrays for tombstone timestamps and number of tombstone occurrences (these two were kept in the same array before) * introduces more test coverage. --- CHANGES.txt | 1 + .../StreamingTombstoneHistogramBuilder.java | 437 +++++++++------------ .../utils/streamhist/TombstoneHistogram.java | 7 +- .../StreamingTombstoneHistogramBuilderTest.java | 232 ++++++++++- 4 files changed, 423 insertions(+), 254 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 86a8813..46790c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773) * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706) * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390) * Repair history tables should have TTL and TWCS (CASSANDRA-12701) diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java index 9856253..eda88bc 100755 --- a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java +++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java @@ -22,28 +22,32 @@ import java.math.RoundingMode; import java.util.Arrays; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.math.IntMath; -import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.ACCUMULATED; -import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.INSERTED; +import org.apache.cassandra.db.rows.Cell; /** * Histogram that can be constructed from streaming of data. + * + * Histogram used to retrieve the number of droppable tombstones for example via + * {@link org.apache.cassandra.io.sstable.format.SSTableReader#getDroppableTombstonesBefore(int)}. * <p> - * The original algorithm is taken from following paper: - * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010) - * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf + * When an sstable is written (or streamed), this histogram-builder receives the "local deletion timestamp" + * as an {@code int} via {@link #update(int)}. Negative values are not supported. * <p> * Algorithm: Histogram is represented as collection of {point, weight} pairs. When new point <i>p</i> with weight <i>m</i> is added: * <ol> - * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li> - * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li> - * <li>If point was added and collection size became lorger than maxBinSize:</li> - * <ol type="a"> - * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li> - * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li> + * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li> + * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li> + * <li>If point was added and collection size became lorger than maxBinSize:</li> * </ol> + * + * <ol type="a"> + * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li> + * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li> * </ol> + * * <p> * There are some optimization to make histogram builder faster: * <ol> @@ -51,19 +55,19 @@ import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramB * For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li> * <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values. * Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li> - * <li>DistanceHolder - sorted collection of distances between points in Bin. It is used to find nearest points in constant time</li> - * <li>Distances and Bin organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li> - * <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin and <i></>{distance, left_point}</i> pairs is packed in one long</li> + * <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li> + * <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li> * </ol> + * <p> + * The original algorithm is taken from following paper: + * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010) + * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf */ public class StreamingTombstoneHistogramBuilder { // Buffer with point-value pair private final DataHolder bin; - // Buffer with distance between points, sorted from nearest to furthest - private final DistanceHolder distances; - // Keep a second, larger buffer to spool data in, before finalizing it into `bin` private final Spool spool; @@ -72,47 +76,42 @@ public class StreamingTombstoneHistogramBuilder public StreamingTombstoneHistogramBuilder(int maxBinSize, int maxSpoolSize, int roundSeconds) { + assert maxBinSize > 0 && maxSpoolSize >= 0 && roundSeconds > 0: "Invalid arguments: maxBinSize:" + maxBinSize + " maxSpoolSize:" + maxSpoolSize + " delta:" + roundSeconds; + this.roundSeconds = roundSeconds; this.bin = new DataHolder(maxBinSize + 1, roundSeconds); - distances = new DistanceHolder(maxBinSize); - - //for spool we need power-of-two cells - maxSpoolSize = maxSpoolSize == 0 ? 0 : IntMath.pow(2, IntMath.log2(maxSpoolSize, RoundingMode.CEILING)); - spool = new Spool(maxSpoolSize); + this.spool = new Spool(maxSpoolSize); } /** - * Adds new point p to this histogram. + * Adds new point to this histogram with a default value of 1. * - * @param p + * @param point the point to be added */ - public void update(int p) + public void update(int point) { - update(p, 1); + update(point, 1); } /** - * Adds new point p with value m to this histogram. - * - * @param p - * @param m + * Adds new point {@param point} with value {@param value} to this histogram. */ - public void update(int p, int m) + public void update(int point, int value) { - p = roundKey(p, roundSeconds); + point = ceilKey(point, roundSeconds); if (spool.capacity > 0) { - if (!spool.tryAddOrAccumulate(p, m)) + if (!spool.tryAddOrAccumulate(point, value)) { flushHistogram(); - final boolean success = spool.tryAddOrAccumulate(p, m); + final boolean success = spool.tryAddOrAccumulate(point, value); assert success : "Can not add value to spool"; // after spool flushing we should always be able to insert new value } } else { - flushValue(p, m); + flushValue(point, value); } } @@ -127,50 +126,11 @@ public class StreamingTombstoneHistogramBuilder private void flushValue(int key, int spoolValue) { - DataHolder.NeighboursAndResult addResult = bin.addValue(key, spoolValue); - if (addResult.result == INSERTED) - { - final int prevPoint = addResult.prevPoint; - final int nextPoint = addResult.nextPoint; - if (prevPoint != -1 && nextPoint != -1) - distances.remove(prevPoint, nextPoint); - if (prevPoint != -1) - distances.add(prevPoint, key); - if (nextPoint != -1) - distances.add(key, nextPoint); - } + bin.addValue(key, spoolValue); if (bin.isFull()) { - mergeBin(); - } - } - - private void mergeBin() - { - // find points point1, point2 which have smallest difference - final int[] smallestDifference = distances.getFirstAndRemove(); - - final int point1 = smallestDifference[0]; - final int point2 = smallestDifference[1]; - - // merge those two - DataHolder.MergeResult mergeResult = bin.merge(point1, point2); - - final int nextPoint = mergeResult.nextPoint; - final int prevPoint = mergeResult.prevPoint; - final int newPoint = mergeResult.newPoint; - - if (nextPoint != -1) - { - distances.remove(point2, nextPoint); - distances.add(newPoint, nextPoint); - } - - if (prevPoint != -1) - { - distances.remove(prevPoint, point1); - distances.add(prevPoint, newPoint); + bin.mergeNearestPoints(); } } @@ -185,72 +145,10 @@ public class StreamingTombstoneHistogramBuilder return new TombstoneHistogram(bin); } - private static class DistanceHolder - { - private static final long EMPTY = Long.MAX_VALUE; - private final long[] data; - - DistanceHolder(int maxCapacity) - { - data = new long[maxCapacity]; - Arrays.fill(data, EMPTY); - } - - void add(int prev, int next) - { - long key = getKey(prev, next); - int index = Arrays.binarySearch(data, key); - - assert (index < 0) : "Element already exists"; - assert (data[data.length - 1] == EMPTY) : "No more space in array"; - - index = -index - 1; - System.arraycopy(data, index, data, index + 1, data.length - index - 1); - data[index] = key; - } - - void remove(int prev, int next) - { - long key = getKey(prev, next); - int index = Arrays.binarySearch(data, key); - if (index >= 0) - { - if (index < data.length) - System.arraycopy(data, index + 1, data, index, data.length - index - 1); - data[data.length - 1] = EMPTY; - } - } - - int[] getFirstAndRemove() - { - if (data[0] == EMPTY) - return null; - - int[] result = unwrapKey(data[0]); - System.arraycopy(data, 1, data, 0, data.length - 1); - data[data.length - 1] = EMPTY; - return result; - } - - private int[] unwrapKey(long key) - { - final int distance = (int) (key >> 32); - final int prev = (int) (key & 0xFF_FF_FF_FFL); - return new int[]{ prev, prev + distance }; - } - - private long getKey(int prev, int next) - { - long distance = next - prev; - return (distance << 32) | prev; - } - - public String toString() - { - return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrapKey).map(Arrays::toString).collect(Collectors.joining()); - } - } - + /** + * An ordered collection of histogram buckets, each entry in the collection represents a pair (bucket, count). + * Once the collection is full it merges the closest buckets using a weighted approach see {@link #mergeNearestPoints()}. + */ static class DataHolder { private static final long EMPTY = Long.MAX_VALUE; @@ -270,11 +168,29 @@ public class StreamingTombstoneHistogramBuilder roundSeconds = holder.roundSeconds; } - NeighboursAndResult addValue(int point, int delta) + @VisibleForTesting + int getValue(int point) + { + long key = wrap(point, 0); + int index = Arrays.binarySearch(data, key); + if (index < 0) + index = -index - 1; + if (index >= data.length) + return -1; // not-found sentinel + if (unwrapPoint(data[index]) != point) + return -2; // not-found sentinel + return unwrapValue(data[index]); + } + + /** + * Adds value {@code delta} to the point {@code point}. + * + * @return {@code true} if inserted, {@code false} if accumulated + */ + boolean addValue(int point, int delta) { long key = wrap(point, 0); int index = Arrays.binarySearch(data, key); - AddResult addResult; if (index < 0) { index = -index - 1; @@ -287,25 +203,35 @@ public class StreamingTombstoneHistogramBuilder System.arraycopy(data, index, data, index + 1, data.length - index - 1); data[index] = wrap(point, delta); - addResult = INSERTED; + return true; } else { - data[index] += delta; - addResult = ACCUMULATED; + data[index] = wrap(point, (long) unwrapValue(data[index]) + delta); } } else { - data[index] += delta; - addResult = ACCUMULATED; + data[index] = wrap(point, (long) unwrapValue(data[index]) + delta); } - return new NeighboursAndResult(getPrevPoint(index), getNextPoint(index), addResult); + return false; } - public MergeResult merge(int point1, int point2) + /** + * Finds nearest points <i>p1</i> and <i>p2</i> in the collection + * Replaces theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2) + */ + @VisibleForTesting + void mergeNearestPoints() { + assert isFull() : "DataHolder must be full in order to merge two points"; + + final int[] smallestDifference = findPointPairWithSmallestDistance(); + + final int point1 = smallestDifference[0]; + final int point2 = smallestDifference[1]; + long key = wrap(point1, 0); int index = Arrays.binarySearch(data, key); if (index < 0) @@ -315,47 +241,44 @@ public class StreamingTombstoneHistogramBuilder assert (unwrapPoint(data[index]) == point1) : "Not found in array"; } - final int prevPoint = getPrevPoint(index); - final int nextPoint = getNextPoint(index + 1); - - int value1 = unwrapValue(data[index]); - int value2 = unwrapValue(data[index + 1]); + long value1 = unwrapValue(data[index]); + long value2 = unwrapValue(data[index + 1]); assert (unwrapPoint(data[index + 1]) == point2) : "point2 should follow point1"; - int sum = value1 + value2; + long sum = value1 + value2; //let's evaluate in long values to handle overflow in multiplication - int newPoint = (int) (((long) point1 * value1 + (long) point2 * value2) / (value1 + value2)); - newPoint = roundKey(newPoint, roundSeconds); - data[index] = wrap(newPoint, sum); + int newPoint = saturatingCastToInt((point1 * value1 + point2 * value2) / sum); + newPoint = ceilKey(newPoint, roundSeconds); + data[index] = wrap(newPoint, saturatingCastToInt(sum)); System.arraycopy(data, index + 2, data, index + 1, data.length - index - 2); data[data.length - 1] = EMPTY; - - return new MergeResult(prevPoint, newPoint, nextPoint); } - private int getPrevPoint(int index) + private int[] findPointPairWithSmallestDistance() { - if (index > 0) - if (data[index - 1] != EMPTY) - return (int) (data[index - 1] >> 32); - else - return -1; - else - return -1; - } + assert isFull(): "The DataHolder must be full in order to find the closest pair of points"; - private int getNextPoint(int index) - { - if (index < data.length - 1) - if (data[index + 1] != EMPTY) - return (int) (data[index + 1] >> 32); - else - return -1; - else - return -1; + int point1 = 0; + int point2 = Integer.MAX_VALUE; + + for (int i = 0; i < data.length - 1; i++) + { + int pointA = unwrapPoint(data[i]); + int pointB = unwrapPoint(data[i + 1]); + + assert pointB > pointA : "DataHolder not sorted, p2(" + pointB +") < p1(" + pointA + ") for " + this; + + if (point2 - point1 > pointB - pointA) + { + point1 = pointA; + point2 = pointB; + } + } + + return new int[]{point1, point2}; } private int[] unwrap(long key) @@ -375,15 +298,15 @@ public class StreamingTombstoneHistogramBuilder return (int) (key & 0xFF_FF_FF_FFL); } - private long wrap(int point, int value) + private long wrap(int point, long value) { - return (((long) point) << 32) | value; + assert point >= 0 : "Invalid argument: point:" + point; + return (((long) point) << 32) | saturatingCastToInt(value); } - public String toString() { - return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrap).map(Arrays::toString).collect(Collectors.joining()); + return Arrays.stream(data).filter(x -> x != EMPTY).mapToObj(this::unwrap).map(Arrays::toString).collect(Collectors.joining()); } public boolean isFull() @@ -434,6 +357,7 @@ public class StreamingTombstoneHistogramBuilder { final int prevPoint = unwrapPoint(data[i - 1]); final int prevValue = unwrapValue(data[i - 1]); + // calculate estimated count mb for point b double weight = (b - prevPoint) / (double) (point - prevPoint); double mb = prevValue + (value - prevValue) * weight; sum -= prevValue; @@ -450,34 +374,6 @@ public class StreamingTombstoneHistogramBuilder return sum; } - static class MergeResult - { - int prevPoint; - int newPoint; - int nextPoint; - - MergeResult(int prevPoint, int newPoint, int nextPoint) - { - this.prevPoint = prevPoint; - this.newPoint = newPoint; - this.nextPoint = nextPoint; - } - } - - static class NeighboursAndResult - { - int prevPoint; - int nextPoint; - AddResult result; - - NeighboursAndResult(int prevPoint, int nextPoint, AddResult result) - { - this.prevPoint = prevPoint; - this.nextPoint = nextPoint; - this.result = result; - } - } - @Override public int hashCode() { @@ -506,38 +402,42 @@ public class StreamingTombstoneHistogramBuilder } } - public enum AddResult - { - INSERTED, - ACCUMULATED - } - + /** + * This class is a specialized open addressing HashMap that uses int as keys and int as values. + * This is an optimization to avoid allocating objects. + * In order for this class to work correctly it should have a power of 2 capacity. + * This last invariant is taken care of during construction. + */ static class Spool { - // odd elements - points, even elements - values - final int[] map; + final int[] points; + final int[] values; + final int capacity; int size; - Spool(int capacity) + Spool(int requestedCapacity) { - this.capacity = capacity; - if (capacity == 0) - { - map = new int[0]; - } - else - { - assert IntMath.isPowerOfTwo(capacity) : "should be power of two"; - // x2 because we want to save points and values in consecutive cells and x2 because we want reprobing less that two when _capacity_ values will be written - map = new int[capacity * 2 * 2]; - clear(); - } + if (requestedCapacity < 0) + throw new IllegalArgumentException("Illegal capacity " + requestedCapacity); + + this.capacity = getPowerOfTwoCapacity(requestedCapacity); + + // x2 because we want no more than two reprobes on average when _capacity_ entries will be written + points = new int[capacity * 2]; + values = new int[capacity * 2]; + clear(); + } + + private int getPowerOfTwoCapacity(int requestedCapacity) + { + //for spool we need power-of-two cells + return requestedCapacity == 0 ? 0 : IntMath.pow(2, IntMath.log2(requestedCapacity, RoundingMode.CEILING)); } void clear() { - Arrays.fill(map, -1); + Arrays.fill(points, -1); size = 0; } @@ -548,12 +448,12 @@ public class StreamingTombstoneHistogramBuilder return false; } - final int cell = 2 * ((capacity - 1) & hash(point)); + final int cell = (capacity - 1) & hash(point); // We use linear scanning. I think cluster of 100 elements is large enough to give up. for (int attempt = 0; attempt < 100; attempt++) { - if (tryCell(cell + attempt * 2, point, delta)) + if (tryCell(cell + attempt, point, delta)) return true; } return false; @@ -567,40 +467,75 @@ public class StreamingTombstoneHistogramBuilder <E extends Exception> void forEach(HistogramDataConsumer<E> consumer) throws E { - for (int i = 0; i < map.length; i += 2) + for (int i = 0; i < points.length; i++) { - if (map[i] != -1) + if (points[i] != -1) { - consumer.consume(map[i], map[i + 1]); + consumer.consume(points[i], values[i]); } } } private boolean tryCell(int cell, int point, int delta) { - cell = cell % map.length; - if (map[cell] == -1) + assert cell >= 0 && point >= 0 && delta >= 0 : "Invalid arguments: cell:" + cell + " point:" + point + " delta:" + delta; + + cell = cell % points.length; + if (points[cell] == -1) { - map[cell] = point; - map[cell + 1] = delta; + points[cell] = point; + values[cell] = delta; size++; return true; } - if (map[cell] == point) + if (points[cell] == point) { - map[cell + 1] += delta; + values[cell] = saturatingCastToInt((long) values[cell] + (long) delta); return true; } return false; } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (int i = 0; i < points.length; i++) + { + if (points[i] == -1) + continue; + if (sb.length() > 1) + sb.append(", "); + sb.append('[').append(points[i]).append(',').append(values[i]).append(']'); + } + sb.append(']'); + return sb.toString(); + } } - private static int roundKey(int p, int roundSeconds) + private static int ceilKey(int point, int bucketSize) { - int d = p % roundSeconds; - if (d > 0) - return p + (roundSeconds - d); - else - return p; + int delta = point % bucketSize; + + if (delta == 0) + return point; + + return saturatingCastToMaxDeletionTime((long) point + (long) bucketSize - (long) delta); + } + + public static int saturatingCastToInt(long value) + { + return (int) (value > Integer.MAX_VALUE ? Integer.MAX_VALUE : value); + } + + /** + * Cast to an int with maximum value of {@code Cell.MAX_DELETION_TIME} to avoid representing values that + * aren't a tombstone + */ + public static int saturatingCastToMaxDeletionTime(long value) + { + return (value < 0L || value > Cell.MAX_DELETION_TIME) + ? Cell.MAX_DELETION_TIME + : (int) value; } } diff --git a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java index 19bdd27..5f2787b 100755 --- a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java +++ b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java @@ -91,7 +91,12 @@ public class TombstoneHistogram DataHolder dataHolder = new DataHolder(size, 1); for (int i = 0; i < size; i++) { - dataHolder.addValue((int)in.readDouble(), (int)in.readLong()); + // Already serialized sstable metadata may contain negative deletion-time values (see CASSANDRA-14092). + // Just do a "safe cast" and it should be good. For safety, also do that for the 'value' (tombstone count). + int localDeletionTime = StreamingTombstoneHistogramBuilder.saturatingCastToMaxDeletionTime((long) in.readDouble()); + int count = StreamingTombstoneHistogramBuilder.saturatingCastToInt(in.readLong()); + + dataHolder.addValue(localDeletionTime, count); } return new TombstoneHistogram(dataHolder); diff --git a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java index c4da5cb..596c4d7 100755 --- a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java +++ b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java @@ -20,15 +20,26 @@ package org.apache.cassandra.utils.streamhist; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.stream.IntStream; import org.junit.Test; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.psjava.util.AssertStatus; +import org.quicktheories.core.Gen; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.quicktheories.QuickTheory.qt; +import static org.quicktheories.generators.SourceDSL.integers; +import static org.quicktheories.generators.SourceDSL.lists; public class StreamingTombstoneHistogramBuilderTest { @@ -111,11 +122,12 @@ public class StreamingTombstoneHistogramBuilderTest builder.update(2); builder.update(2); builder.update(2); + builder.update(2, Integer.MAX_VALUE); // To check that value overflow is handled correctly TombstoneHistogram hist = builder.build(); Map<Integer, Integer> asMap = asMap(hist); assertEquals(1, asMap.size()); - assertEquals(3, asMap.get(2).intValue()); + assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue()); //Make sure it's working with Serde DataOutputBuffer out = new DataOutputBuffer(); @@ -126,7 +138,7 @@ public class StreamingTombstoneHistogramBuilderTest asMap = asMap(deserialized); assertEquals(1, deserialized.size()); - assertEquals(3, asMap.get(2).intValue()); + assertEquals(Integer.MAX_VALUE, asMap.get(2).intValue()); } @Test @@ -167,10 +179,226 @@ public class StreamingTombstoneHistogramBuilderTest IntStream.range(Integer.MAX_VALUE - 30, Integer.MAX_VALUE).forEach(builder::update); } + @Test + public void testLargeDeletionTimesAndLargeValuesDontCauseOverflow() + { + qt().forAll(streamingTombstoneHistogramBuilderGen(1000, 300000, 60), + lists().of(integers().from(0).upTo(Cell.MAX_DELETION_TIME)).ofSize(300), + lists().of(integers().allPositive()).ofSize(300)) + .checkAssert(this::updateHistogramAndCheckAllBucketsArePositive); + } + + private void updateHistogramAndCheckAllBucketsArePositive(StreamingTombstoneHistogramBuilder histogramBuilder, List<Integer> keys, List<Integer> values) + { + for (int i = 0; i < keys.size(); i++) + { + histogramBuilder.update(keys.get(i), values.get(i)); + } + + TombstoneHistogram histogram = histogramBuilder.build(); + for (Map.Entry<Integer, Integer> buckets : asMap(histogram).entrySet()) + { + assertTrue("Invalid bucket key", buckets.getKey() >= 0); + assertTrue("Invalid bucket value", buckets.getValue() >= 0); + } + } + + @Test + public void testThatPointIsNotMissedBecauseOfRoundingToNoDeletionTime() throws Exception + { + int pointThatRoundedToNoDeletion = Cell.NO_DELETION_TIME - 2; + assert pointThatRoundedToNoDeletion + pointThatRoundedToNoDeletion % 3 == Cell.NO_DELETION_TIME : "test data should be valid"; + + StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 3); + builder.update(pointThatRoundedToNoDeletion); + + TombstoneHistogram histogram = builder.build(); + + Map<Integer, Integer> integerIntegerMap = asMap(histogram); + assertEquals(integerIntegerMap.size(), 1); + assertEquals(integerIntegerMap.get(Cell.MAX_DELETION_TIME).intValue(), 1); + } + + @Test + public void testInvalidArguments() + { + assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, 0)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:0"); + assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, 10, -1)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:10 delta:-1"); + assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(5, -1, 60)).hasMessage("Invalid arguments: maxBinSize:5 maxSpoolSize:-1 delta:60"); + assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(-1, 10, 60)).hasMessage("Invalid arguments: maxBinSize:-1 maxSpoolSize:10 delta:60"); + assertThatThrownBy(() -> new StreamingTombstoneHistogramBuilder(0, 10, 60)).hasMessage("Invalid arguments: maxBinSize:0 maxSpoolSize:10 delta:60"); + } + + @Test + public void testSpool() + { + StreamingTombstoneHistogramBuilder.Spool spool = new StreamingTombstoneHistogramBuilder.Spool(8); + assertTrue(spool.tryAddOrAccumulate(5, 1)); + assertSpool(spool, 5, 1); + assertTrue(spool.tryAddOrAccumulate(5, 3)); + assertSpool(spool, 5, 4); + + assertTrue(spool.tryAddOrAccumulate(10, 1)); + assertSpool(spool, 5, 4, + 10, 1); + + assertTrue(spool.tryAddOrAccumulate(12, 1)); + assertTrue(spool.tryAddOrAccumulate(14, 1)); + assertTrue(spool.tryAddOrAccumulate(16, 1)); + assertSpool(spool, 5, 4, + 10, 1, + 12, 1, + 14, 1, + 16, 1); + + assertTrue(spool.tryAddOrAccumulate(18, 1)); + assertTrue(spool.tryAddOrAccumulate(20, 1)); + assertTrue(spool.tryAddOrAccumulate(30, 1)); + assertSpool(spool, 5, 4, + 10, 1, + 12, 1, + 14, 1, + 16, 1, + 18, 1, + 20, 1, + 30, 1); + + assertTrue(spool.tryAddOrAccumulate(16, 5)); + assertTrue(spool.tryAddOrAccumulate(12, 4)); + assertTrue(spool.tryAddOrAccumulate(18, 9)); + assertSpool(spool, + 5, 4, + 10, 1, + 12, 5, + 14, 1, + 16, 6, + 18, 10, + 20, 1, + 30, 1); + + assertTrue(spool.tryAddOrAccumulate(99, 5)); + } + + @Test + public void testDataHolder() + { + StreamingTombstoneHistogramBuilder.DataHolder dataHolder = new StreamingTombstoneHistogramBuilder.DataHolder(4, 1); + assertFalse(dataHolder.isFull()); + assertEquals(0, dataHolder.size()); + + assertTrue(dataHolder.addValue(4, 1)); + assertDataHolder(dataHolder, + 4, 1); + + assertFalse(dataHolder.addValue(4, 1)); + assertDataHolder(dataHolder, + 4, 2); + + assertTrue(dataHolder.addValue(7, 1)); + assertDataHolder(dataHolder, + 4, 2, + 7, 1); + + assertFalse(dataHolder.addValue(7, 1)); + assertDataHolder(dataHolder, + 4, 2, + 7, 2); + + assertTrue(dataHolder.addValue(5, 1)); + assertDataHolder(dataHolder, + 4, 2, + 5, 1, + 7, 2); + + assertFalse(dataHolder.addValue(5, 1)); + assertDataHolder(dataHolder, + 4, 2, + 5, 2, + 7, 2); + + assertTrue(dataHolder.addValue(2, 1)); + assertDataHolder(dataHolder, + 2, 1, + 4, 2, + 5, 2, + 7, 2); + assertTrue(dataHolder.isFull()); + + // expect to merge [4,2]+[5,2] + dataHolder.mergeNearestPoints(); + assertDataHolder(dataHolder, + 2, 1, + 4, 4, + 7, 2); + + assertFalse(dataHolder.addValue(2, 1)); + assertDataHolder(dataHolder, + 2, 2, + 4, 4, + 7, 2); + + dataHolder.addValue(8, 1); + assertDataHolder(dataHolder, + 2, 2, + 4, 4, + 7, 2, + 8, 1); + assertTrue(dataHolder.isFull()); + + // expect to merge [7,2]+[8,1] + dataHolder.mergeNearestPoints(); + assertDataHolder(dataHolder, + 2, 2, + 4, 4, + 7, 3); + } + + private static void assertDataHolder(StreamingTombstoneHistogramBuilder.DataHolder dataHolder, int... pointValue) + { + assertEquals(pointValue.length / 2, dataHolder.size()); + + for (int i = 0; i < pointValue.length; i += 2) + { + int point = pointValue[i]; + int expectedValue = pointValue[i + 1]; + assertEquals(expectedValue, dataHolder.getValue(point)); + } + } + + /** + * Compare the contents of {@code spool} with the given collection of key-value pairs in {@code pairs}. + */ + private static void assertSpool(StreamingTombstoneHistogramBuilder.Spool spool, int... pairs) + { + assertEquals(pairs.length / 2, spool.size); + Map<Integer, Integer> tests = new HashMap<>(); + for (int i = 0; i < pairs.length; i += 2) + tests.put(pairs[i], pairs[i + 1]); + + spool.forEach((k, v) -> { + Integer x = tests.remove(k); + assertNotNull("key " + k, x); + assertEquals(x.intValue(), v); + }); + AssertStatus.assertTrue(tests.isEmpty()); + } + private Map<Integer, Integer> asMap(TombstoneHistogram histogram) { Map<Integer, Integer> result = new HashMap<>(); histogram.forEach(result::put); return result; } + + private Gen<StreamingTombstoneHistogramBuilder> streamingTombstoneHistogramBuilderGen(int maxBinSize, int maxSpoolSize, int maxRoundSeconds) + { + return positiveIntegerUpTo(maxBinSize).zip(integers().between(0, maxSpoolSize), + positiveIntegerUpTo(maxRoundSeconds), + StreamingTombstoneHistogramBuilder::new); + } + + private Gen<Integer> positiveIntegerUpTo(int upperBound) + { + return integers().between(1, upperBound); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org