backport burn test refactor
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd4a9d18 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd4a9d18 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd4a9d18 Branch: refs/heads/cassandra-2.2 Commit: bd4a9d18e1317dcb8542bd4adc5a9f99b108d6c6 Parents: 8a56868 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Sun Jun 28 11:38:22 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Sun Jun 28 11:38:22 2015 +0100 ---------------------------------------------------------------------- build.xml | 7 + .../cassandra/concurrent/LongOpOrderTest.java | 240 +++++++++ .../concurrent/LongSharedExecutorPoolTest.java | 226 +++++++++ .../apache/cassandra/utils/LongBTreeTest.java | 502 +++++++++++++++++++ .../cassandra/concurrent/LongOpOrderTest.java | 240 --------- .../concurrent/LongSharedExecutorPoolTest.java | 228 --------- .../apache/cassandra/utils/LongBTreeTest.java | 401 --------------- 7 files changed, 975 insertions(+), 869 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 73e76e5..18ad49f 100644 --- a/build.xml +++ b/build.xml @@ -93,6 +93,7 @@ <property name="test.timeout" value="60000" /> <property name="test.long.timeout" value="600000" /> + <property name="test.burn.timeout" value="600000" /> <!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false --> <property name="cassandra.test.use_prepared" value="true" /> @@ -1258,6 +1259,12 @@ </testmacro> </target> + <target name="test-burn" depends="build-test" description="Execute functional tests"> + <testmacro suitename="burn" inputdir="${test.burn.src}" + timeout="${test.burn.timeout}"> + </testmacro> + </target> + <target name="long-test" depends="build-test" description="Execute functional tests"> <testmacro suitename="long" inputdir="${test.long.src}" timeout="${test.long.timeout}"> http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java new file mode 100644 index 0000000..d7105df --- /dev/null +++ b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java @@ -0,0 +1,240 @@ +package org.apache.cassandra.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.utils.concurrent.OpOrder; + +import static org.junit.Assert.assertTrue; + +// TODO: we don't currently test SAFE functionality at all! +// TODO: should also test markBlocking and SyncOrdered +public class LongOpOrderTest +{ + + private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class); + + static final int CONSUMERS = 4; + static final int PRODUCERS = 32; + + static final long RUNTIME = TimeUnit.MINUTES.toMillis(5); + static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1); + + static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread t, Throwable e) + { + System.err.println(t.getName() + ": " + e.getMessage()); + e.printStackTrace(); + } + }; + + final OpOrder order = new OpOrder(); + final AtomicInteger errors = new AtomicInteger(); + + class TestOrdering implements Runnable + { + + final int[] waitNanos = new int[1 << 16]; + volatile State state = new State(); + final ScheduledExecutorService sched; + + TestOrdering(ExecutorService exec, ScheduledExecutorService sched) + { + this.sched = sched; + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int i = 0 ; i < waitNanos.length ; i++) + waitNanos[i] = rnd.nextInt(5000); + for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++) + exec.execute(new Producer()); + exec.execute(this); + } + + @Override + public void run() + { + final long until = System.currentTimeMillis() + RUNTIME; + long lastReport = System.currentTimeMillis(); + long count = 0; + long opCount = 0; + while (true) + { + long now = System.currentTimeMillis(); + if (now > until) + break; + if (now > lastReport + REPORT_INTERVAL) + { + lastReport = now; + logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.", + Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME)))); + } + try + { + Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]); + } catch (InterruptedException e) + { + e.printStackTrace(); + } + + final State s = state; + s.barrier = order.newBarrier(); + s.replacement = new State(); + s.barrier.issue(); + s.barrier.await(); + s.check(); + opCount += s.totalCount(); + state = s.replacement; + sched.schedule(new Runnable() + { + @Override + public void run() + { + s.check(); + } + }, 1, TimeUnit.SECONDS); + count++; + } + } + + class State + { + + volatile OpOrder.Barrier barrier; + volatile State replacement; + final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>(); + int checkCount = -1; + + boolean accept(OpOrder.Group opGroup) + { + if (barrier != null && !barrier.isAfter(opGroup)) + return false; + AtomicInteger c; + if (null == (c = count.get(opGroup))) + { + count.putIfAbsent(opGroup, new AtomicInteger()); + c = count.get(opGroup); + } + c.incrementAndGet(); + return true; + } + + int totalCount() + { + int c = 0; + for (AtomicInteger v : count.values()) + c += v.intValue(); + return c; + } + + void check() + { + boolean delete; + if (checkCount >= 0) + { + if (checkCount != totalCount()) + { + errors.incrementAndGet(); + logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount()); + } + delete = true; + } + else + { + checkCount = totalCount(); + delete = false; + } + for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet()) + { + if (e.getKey().compareTo(barrier.getSyncPoint()) > 0) + { + errors.incrementAndGet(); + logger.error("Received an operation that was created after the barrier was issued."); + } + if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue()) + { + errors.incrementAndGet(); + logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue()); + } + if (delete) + TestOrdering.this.count.remove(e.getKey()); + } + } + + } + + final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>(); + + class Producer implements Runnable + { + public void run() + { + while (true) + { + AtomicInteger c; + try (OpOrder.Group opGroup = order.start()) + { + if (null == (c = count.get(opGroup))) + { + count.putIfAbsent(opGroup, new AtomicInteger()); + c = count.get(opGroup); + } + c.incrementAndGet(); + State s = state; + while (!s.accept(opGroup)) + s = s.replacement; + } + } + } + } + + } + + @Test + public void testOrdering() throws InterruptedException + { + errors.set(0); + Thread.setDefaultUncaughtExceptionHandler(handler); + final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker")); + final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker")); + for (int i = 0 ; i < CONSUMERS ; i++) + new TestOrdering(exec, checker); + exec.shutdown(); + exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS); + assertTrue(exec.isShutdown()); + assertTrue(errors.get() == 0); + } + + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java new file mode 100644 index 0000000..fe464c7 --- /dev/null +++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.concurrent; + +import java.util.BitSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.commons.math3.distribution.WeibullDistribution; +import org.junit.Test; + +public class LongSharedExecutorPoolTest +{ + + private static final class WaitTask implements Runnable + { + final long nanos; + + private WaitTask(long nanos) + { + this.nanos = nanos; + } + + public void run() + { + LockSupport.parkNanos(nanos); + } + } + + private static final class Result implements Comparable<Result> + { + final Future<?> future; + final long forecastedCompletion; + + private Result(Future<?> future, long forecastedCompletion) + { + this.future = future; + this.forecastedCompletion = forecastedCompletion; + } + + public int compareTo(Result that) + { + int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion); + if (c != 0) + return c; + c = Integer.compare(this.hashCode(), that.hashCode()); + if (c != 0) + return c; + return Integer.compare(this.future.hashCode(), that.future.hashCode()); + } + } + + private static final class Batch implements Comparable<Batch> + { + final TreeSet<Result> results; + final long timeout; + final int executorIndex; + + private Batch(TreeSet<Result> results, long timeout, int executorIndex) + { + this.results = results; + this.timeout = timeout; + this.executorIndex = executorIndex; + } + + public int compareTo(Batch that) + { + int c = Long.compare(this.timeout, that.timeout); + if (c != 0) + return c; + c = Integer.compare(this.results.size(), that.results.size()); + if (c != 0) + return c; + return Integer.compare(this.hashCode(), that.hashCode()); + } + } + + @Test + public void testPromptnessOfExecution() throws InterruptedException, ExecutionException + { + testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f); + } + + private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException + { + final int executorCount = 4; + int threadCount = 8; + int maxQueued = 1024; + final WeibullDistribution workTime = new WeibullDistribution(3, 200000); + final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1); + final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1); + + final int[] threadCounts = new int[executorCount]; + final WeibullDistribution[] workCount = new WeibullDistribution[executorCount]; + final ExecutorService[] executors = new ExecutorService[executorCount]; + for (int i = 0 ; i < executors.length ; i++) + { + executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i); + threadCounts[i] = threadCount; + workCount[i] = new WeibullDistribution(2, maxQueued); + threadCount *= 2; + maxQueued *= 2; + } + + long runs = 0; + long events = 0; + final TreeSet<Batch> pending = new TreeSet<>(); + final BitSet executorsWithWork = new BitSet(executorCount); + long until = 0; + // basic idea is to go through different levels of load on the executor service; initially is all small batches + // (mostly within max queue size) of very short operations, moving to progressively larger batches + // (beyond max queued size), and longer operations + for (float multiplier = 0f ; multiplier < 2.01f ; ) + { + if (System.nanoTime() > until) + { + System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f)); + events = 0; + until = System.nanoTime() + intervalNanos; + multiplier += loadIncrement; + System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier)); + } + + // wait a random amount of time so we submit new tasks in various stages of + long timeout; + if (pending.isEmpty()) timeout = 0; + else if (Math.random() > 0.98) timeout = Long.MAX_VALUE; + else if (pending.size() == executorCount) timeout = pending.first().timeout; + else timeout = (long) (Math.random() * pending.last().timeout); + + while (!pending.isEmpty() && timeout > System.nanoTime()) + { + Batch first = pending.first(); + boolean complete = false; + try + { + for (Result result : first.results.descendingSet()) + result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS); + complete = true; + } + catch (TimeoutException e) + { + } + if (!complete && System.nanoTime() > first.timeout) + { + for (Result result : first.results) + if (!result.future.isDone()) + throw new AssertionError(); + complete = true; + } + if (complete) + { + pending.pollFirst(); + executorsWithWork.clear(first.executorIndex); + } + } + + // if we've emptied the executors, give all our threads an opportunity to spin down + if (timeout == Long.MAX_VALUE) + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + + // submit a random batch to the first free executor service + int executorIndex = executorsWithWork.nextClearBit(0); + if (executorIndex >= executorCount) + continue; + executorsWithWork.set(executorIndex); + ExecutorService executor = executors[executorIndex]; + TreeSet<Result> results = new TreeSet<>(); + int count = (int) (workCount[executorIndex].sample() * multiplier); + long targetTotalElapsed = 0; + long start = System.nanoTime(); + long baseTime; + if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier); + else baseTime = 0; + for (int j = 0 ; j < count ; j++) + { + long time; + if (baseTime == 0) time = (long) (workTime.sample() * multiplier); + else time = (long) (baseTime * Math.random()); + if (time < minWorkTime) + time = minWorkTime; + if (time > maxWorkTime) + time = maxWorkTime; + targetTotalElapsed += time; + Future<?> future = executor.submit(new WaitTask(time)); + results.add(new Result(future, System.nanoTime() + time)); + } + long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex]) + + TimeUnit.MILLISECONDS.toNanos(100L); + long now = System.nanoTime(); + if (runs++ > executorCount && now > end) + throw new AssertionError(); + events += results.size(); + pending.add(new Batch(results, end, executorIndex)); +// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start))); + } + } + + public static void main(String[] args) throws InterruptedException, ExecutionException + { + // do longer test + new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/utils/LongBTreeTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java new file mode 100644 index 0000000..9641930 --- /dev/null +++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Random; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import org.junit.Assert; +import org.junit.Test; + + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; +import org.apache.cassandra.utils.btree.BTreeSet; +import org.apache.cassandra.utils.btree.UpdateFunction; + +// TODO : should probably lower fan-factor for tests to make them more intensive +public class LongBTreeTest +{ + + private static final MetricRegistry metrics = new MetricRegistry(); + private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE")); + private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE")); + private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY")); + private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE")); + private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f); + + static + { + System.setProperty("cassandra.btree.fanfactor", "4"); + } + + @Test + public void testOversizedMiddleInsert() + { + TreeSet<Integer> canon = new TreeSet<>(); + for (int i = 0 ; i < 10000000 ; i++) + canon.add(i); + Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null); + btree = BTree.update(btree, ICMP, canon, true); + canon.add(Integer.MIN_VALUE); + canon.add(Integer.MAX_VALUE); + Assert.assertTrue(BTree.isWellFormed(btree, ICMP)); + testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator()); + } + + @Test + public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException + { + testInsertions(10000000, 50, 1, 1, true); + } + + @Test + public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException + { + testInsertions(10000000, 50, 1, 5, true); + } + + @Test + public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException + { + testInsertions(10000000, 500, 10, 1, true); + } + + @Test + public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException + { + testInsertions(10000000, 500, 10, 10, true); + } + + @Test + public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException + { + testInsertions(100000000, 5000, 3, 100, true); + } + + @Test + public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException + { + testInsertions(10000, 50, 10, 10, false); + } + + @Test + public void testSearchIterator() throws InterruptedException + { + int threads = Runtime.getRuntime().availableProcessors(); + final CountDownLatch latch = new CountDownLatch(threads); + final AtomicLong errors = new AtomicLong(); + final AtomicLong count = new AtomicLong(); + final int perThreadTrees = 100; + final int perTreeSelections = 100; + final long totalCount = threads * perThreadTrees * perTreeSelections; + for (int t = 0 ; t < threads ; t++) + { + MODIFY.execute(new Runnable() + { + public void run() + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0 ; i < perThreadTrees ; i++) + { + Object[] tree = randomTree(10000, random); + for (int j = 0 ; j < perTreeSelections ; j++) + { + BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP); + for (Integer key : randomSelection(tree, random)) + if (key != searchIterator.next(key)) + errors.incrementAndGet(); + searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP); + for (Integer key : randomMix(tree, random)) + if (key != searchIterator.next(key)) + if (BTree.find(tree, ICMP, key) == key) + errors.incrementAndGet(); + count.incrementAndGet(); + } + } + latch.countDown(); + } + }); + } + while (latch.getCount() > 0) + { + latch.await(10L, TimeUnit.SECONDS); + System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : "")); + assert errors.get() == 0; + } + } + + private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException + { + int batchesPerTest = perTestCount / modificationBatchSize; + int maximumRunLength = 100; + int testKeyRange = perTestCount * testKeyRatio; + int tests = totalCount / perTestCount; + System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops", + tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize)); + + // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks + int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2)); + for (int chunk = 0 ; chunk < tests ; chunk += chunkSize) + { + final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>(); + for (int i = 0 ; i < chunkSize ; i++) + { + outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality)); + } + + final List<ListenableFuture<?>> inner = new ArrayList<>(); + int complete = 0; + int reportInterval = totalCount / 100; + int lastReportAt = 0; + for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer) + { + inner.addAll(f.get()); + complete += perTestCount; + if (complete - lastReportAt >= reportInterval) + { + System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount)); + lastReportAt = complete; + } + } + Futures.allAsList(inner).get(); + } + Snapshot snap = BTREE_TIMER.getSnapshot(); + System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); + snap = TREE_TIMER.getSnapshot(); + System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); + System.out.println("Done"); + } + + private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality) + { + ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>() + { + @Override + public List<ListenableFuture<?>> call() + { + final List<ListenableFuture<?>> r = new ArrayList<>(); + NavigableMap<Integer, Integer> canon = new TreeMap<>(); + Object[] btree = BTree.empty(); + final TreeMap<Integer, Integer> buffer = new TreeMap<>(); + final Random rnd = new Random(); + for (int i = 0 ; i < iterations ; i++) + { + buffer.clear(); + int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration); + while (mods > 0) + { + int v = rnd.nextInt(upperBound); + int rc = Math.max(0, Math.min(mods, maxRunLength) - 1); + int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc)); + for (int j = 0 ; j < c ; j++) + { + buffer.put(v, v); + v++; + } + mods -= c; + } + Timer.Context ctxt; + ctxt = TREE_TIMER.time(); + canon.putAll(buffer); + ctxt.stop(); + ctxt = BTREE_TIMER.time(); + Object[] next = null; + while (next == null) + next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT); + btree = next; + ctxt.stop(); + + if (!BTree.isWellFormed(btree, ICMP)) + { + System.out.println("ERROR: Not well formed"); + throw new AssertionError("Not well formed!"); + } + if (quickEquality) + testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator()); + else + r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet()))); + } + return r; + } + }); + MODIFY.execute(f); + return f; + } + + @Test + public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException + { + Object[] cur = BTree.empty(); + TreeSet<Integer> canon = new TreeSet<>(); + // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated + for (int i = 0 ; i < 128 ; i++) + { + String id = String.format("[0..%d)", canon.size()); + System.out.println("Testing " + id); + Futures.allAsList(testAllSlices(id, cur, canon)).get(); + Object[] next = null; + while (next == null) + next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT); + cur = next; + canon.add(i); + } + } + + static final Comparator<Integer> ICMP = new Comparator<Integer>() + { + @Override + public int compare(Integer o1, Integer o2) + { + return Integer.compare(o1, o2); + } + }; + + private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon) + { + List<ListenableFuture<?>> waitFor = new ArrayList<>(); + testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor); + testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor); + return waitFor; + } + + private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results) + { + testOneSlice(id, btree, canon, results); + for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending)) + { + // test head/tail sets + testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results); + testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results); + testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results); + testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results); + for (Integer ub : range(canon.size(), lb, ascending)) + { + // test subsets + testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results); + testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results); + testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results); + testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results); + } + } + } + + private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results) + { + ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable() + { + + @Override + public void run() + { + test(id + " Count", test.size(), canon.size()); + testEqual(id, test.iterator(), canon.iterator()); + testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator()); + testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator()); + testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator()); + } + }, null); + results.add(f); + COMPARE.execute(f); + } + + private static void test(String id, int test, int expect) + { + if (test != expect) + { + System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test)); + } + } + + private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon) + { + boolean equal = true; + while (btree.hasNext() && canon.hasNext()) + { + Object i = btree.next(); + Object j = canon.next(); + if (!i.equals(j)) + { + System.out.println(String.format("%s: Expected %d, Got %d", id, j, i)); + equal = false; + } + } + while (btree.hasNext()) + { + System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next())); + equal = false; + } + while (canon.hasNext()) + { + System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next())); + equal = false; + } + if (!equal) + throw new AssertionError("Not equal"); + } + + // should only be called on sets that range from 0->N or N->0 + private static final Iterable<Integer> range(final int size, final int from, final boolean ascending) + { + return new Iterable<Integer>() + { + int cur; + int delta; + int end; + { + if (ascending) + { + end = size + 1; + cur = from == Integer.MIN_VALUE ? -1 : from; + delta = 1; + } + else + { + end = -2; + cur = from == Integer.MIN_VALUE ? size : from; + delta = -1; + } + } + @Override + public Iterator<Integer> iterator() + { + return new Iterator<Integer>() + { + @Override + public boolean hasNext() + { + return cur != end; + } + + @Override + public Integer next() + { + Integer r = cur; + cur += delta; + return r; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + private static Object[] randomTree(int maxSize, Random random) + { + TreeSet<Integer> build = new TreeSet<>(); + int size = random.nextInt(maxSize); + for (int i = 0 ; i < size ; i++) + { + build.add(random.nextInt()); + } + return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance()); + } + + private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd) + { + final float proportion = rnd.nextFloat(); + return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>() + { + public boolean apply(Integer integer) + { + return rnd.nextFloat() < proportion; + } + }); + } + + private static Iterable<Integer> randomMix(Object[] iter, final Random rnd) + { + final float proportion = rnd.nextFloat(); + return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>() + { + long last = Integer.MIN_VALUE; + + public Integer apply(Integer v) + { + long last = this.last; + this.last = v; + if (rnd.nextFloat() < proportion) + return v; + return (int)((v - last) / 2); + } + }); + } + + private static final class RandomAbort<V> implements UpdateFunction<V> + { + final Random rnd; + final float chance; + private RandomAbort(Random rnd, float chance) + { + this.rnd = rnd; + this.chance = chance; + } + + public V apply(V replacing, V update) + { + return update; + } + + public boolean abortEarly() + { + return rnd.nextFloat() < chance; + } + + public void allocated(long heapSize) + { + + } + + public V apply(V v) + { + return v; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java deleted file mode 100644 index d7105df..0000000 --- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java +++ /dev/null @@ -1,240 +0,0 @@ -package org.apache.cassandra.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.utils.concurrent.OpOrder; - -import static org.junit.Assert.assertTrue; - -// TODO: we don't currently test SAFE functionality at all! -// TODO: should also test markBlocking and SyncOrdered -public class LongOpOrderTest -{ - - private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class); - - static final int CONSUMERS = 4; - static final int PRODUCERS = 32; - - static final long RUNTIME = TimeUnit.MINUTES.toMillis(5); - static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1); - - static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() - { - @Override - public void uncaughtException(Thread t, Throwable e) - { - System.err.println(t.getName() + ": " + e.getMessage()); - e.printStackTrace(); - } - }; - - final OpOrder order = new OpOrder(); - final AtomicInteger errors = new AtomicInteger(); - - class TestOrdering implements Runnable - { - - final int[] waitNanos = new int[1 << 16]; - volatile State state = new State(); - final ScheduledExecutorService sched; - - TestOrdering(ExecutorService exec, ScheduledExecutorService sched) - { - this.sched = sched; - final ThreadLocalRandom rnd = ThreadLocalRandom.current(); - for (int i = 0 ; i < waitNanos.length ; i++) - waitNanos[i] = rnd.nextInt(5000); - for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++) - exec.execute(new Producer()); - exec.execute(this); - } - - @Override - public void run() - { - final long until = System.currentTimeMillis() + RUNTIME; - long lastReport = System.currentTimeMillis(); - long count = 0; - long opCount = 0; - while (true) - { - long now = System.currentTimeMillis(); - if (now > until) - break; - if (now > lastReport + REPORT_INTERVAL) - { - lastReport = now; - logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.", - Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME)))); - } - try - { - Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]); - } catch (InterruptedException e) - { - e.printStackTrace(); - } - - final State s = state; - s.barrier = order.newBarrier(); - s.replacement = new State(); - s.barrier.issue(); - s.barrier.await(); - s.check(); - opCount += s.totalCount(); - state = s.replacement; - sched.schedule(new Runnable() - { - @Override - public void run() - { - s.check(); - } - }, 1, TimeUnit.SECONDS); - count++; - } - } - - class State - { - - volatile OpOrder.Barrier barrier; - volatile State replacement; - final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>(); - int checkCount = -1; - - boolean accept(OpOrder.Group opGroup) - { - if (barrier != null && !barrier.isAfter(opGroup)) - return false; - AtomicInteger c; - if (null == (c = count.get(opGroup))) - { - count.putIfAbsent(opGroup, new AtomicInteger()); - c = count.get(opGroup); - } - c.incrementAndGet(); - return true; - } - - int totalCount() - { - int c = 0; - for (AtomicInteger v : count.values()) - c += v.intValue(); - return c; - } - - void check() - { - boolean delete; - if (checkCount >= 0) - { - if (checkCount != totalCount()) - { - errors.incrementAndGet(); - logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount()); - } - delete = true; - } - else - { - checkCount = totalCount(); - delete = false; - } - for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet()) - { - if (e.getKey().compareTo(barrier.getSyncPoint()) > 0) - { - errors.incrementAndGet(); - logger.error("Received an operation that was created after the barrier was issued."); - } - if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue()) - { - errors.incrementAndGet(); - logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue()); - } - if (delete) - TestOrdering.this.count.remove(e.getKey()); - } - } - - } - - final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>(); - - class Producer implements Runnable - { - public void run() - { - while (true) - { - AtomicInteger c; - try (OpOrder.Group opGroup = order.start()) - { - if (null == (c = count.get(opGroup))) - { - count.putIfAbsent(opGroup, new AtomicInteger()); - c = count.get(opGroup); - } - c.incrementAndGet(); - State s = state; - while (!s.accept(opGroup)) - s = s.replacement; - } - } - } - } - - } - - @Test - public void testOrdering() throws InterruptedException - { - errors.set(0); - Thread.setDefaultUncaughtExceptionHandler(handler); - final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker")); - final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker")); - for (int i = 0 ; i < CONSUMERS ; i++) - new TestOrdering(exec, checker); - exec.shutdown(); - exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS); - assertTrue(exec.isShutdown()); - assertTrue(errors.get() == 0); - } - - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java deleted file mode 100644 index 0fd53bb..0000000 --- a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.concurrent; - -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.LockSupport; - -import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.commons.math3.distribution.WeibullDistribution; -import org.junit.Test; - -public class LongSharedExecutorPoolTest -{ - - private static final class WaitTask implements Runnable - { - final long nanos; - - private WaitTask(long nanos) - { - this.nanos = nanos; - } - - public void run() - { - LockSupport.parkNanos(nanos); - } - } - - private static final class Result implements Comparable<Result> - { - final Future<?> future; - final long forecastedCompletion; - - private Result(Future<?> future, long forecastedCompletion) - { - this.future = future; - this.forecastedCompletion = forecastedCompletion; - } - - public int compareTo(Result that) - { - int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion); - if (c != 0) - return c; - c = Integer.compare(this.hashCode(), that.hashCode()); - if (c != 0) - return c; - return Integer.compare(this.future.hashCode(), that.future.hashCode()); - } - } - - private static final class Batch implements Comparable<Batch> - { - final TreeSet<Result> results; - final long timeout; - final int executorIndex; - - private Batch(TreeSet<Result> results, long timeout, int executorIndex) - { - this.results = results; - this.timeout = timeout; - this.executorIndex = executorIndex; - } - - public int compareTo(Batch that) - { - int c = Long.compare(this.timeout, that.timeout); - if (c != 0) - return c; - c = Integer.compare(this.results.size(), that.results.size()); - if (c != 0) - return c; - return Integer.compare(this.hashCode(), that.hashCode()); - } - } - - @Test - public void testPromptnessOfExecution() throws InterruptedException, ExecutionException, TimeoutException - { - testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f); - } - - private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException, TimeoutException - { - final int executorCount = 4; - int threadCount = 8; - int maxQueued = 1024; - final WeibullDistribution workTime = new WeibullDistribution(3, 200000); - final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1); - final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1); - - final int[] threadCounts = new int[executorCount]; - final WeibullDistribution[] workCount = new WeibullDistribution[executorCount]; - final ExecutorService[] executors = new ExecutorService[executorCount]; - for (int i = 0 ; i < executors.length ; i++) - { - executors[i] = JMXEnabledSharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i); - threadCounts[i] = threadCount; - workCount[i] = new WeibullDistribution(2, maxQueued); - threadCount *= 2; - maxQueued *= 2; - } - - long runs = 0; - long events = 0; - final TreeSet<Batch> pending = new TreeSet<>(); - final BitSet executorsWithWork = new BitSet(executorCount); - long until = 0; - // basic idea is to go through different levels of load on the executor service; initially is all small batches - // (mostly within max queue size) of very short operations, moving to progressively larger batches - // (beyond max queued size), and longer operations - for (float multiplier = 0f ; multiplier < 2.01f ; ) - { - if (System.nanoTime() > until) - { - System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f)); - events = 0; - until = System.nanoTime() + intervalNanos; - multiplier += loadIncrement; - System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier)); - } - - // wait a random amount of time so we submit new tasks in various stages of - long timeout; - if (pending.isEmpty()) timeout = 0; - else if (Math.random() > 0.98) timeout = Long.MAX_VALUE; - else if (pending.size() == executorCount) timeout = pending.first().timeout; - else timeout = (long) (Math.random() * pending.last().timeout); - - while (!pending.isEmpty() && timeout > System.nanoTime()) - { - Batch first = pending.first(); - boolean complete = false; - try - { - for (Result result : first.results.descendingSet()) - result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS); - complete = true; - } - catch (TimeoutException e) - { - } - if (!complete && System.nanoTime() > first.timeout) - { - for (Result result : first.results) - if (!result.future.isDone()) - throw new AssertionError(); - complete = true; - } - if (complete) - { - pending.pollFirst(); - executorsWithWork.clear(first.executorIndex); - } - } - - // if we've emptied the executors, give all our threads an opportunity to spin down - if (timeout == Long.MAX_VALUE) - Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); - - // submit a random batch to the first free executor service - int executorIndex = executorsWithWork.nextClearBit(0); - if (executorIndex >= executorCount) - continue; - executorsWithWork.set(executorIndex); - ExecutorService executor = executors[executorIndex]; - TreeSet<Result> results = new TreeSet<>(); - int count = (int) (workCount[executorIndex].sample() * multiplier); - long targetTotalElapsed = 0; - long start = System.nanoTime(); - long baseTime; - if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier); - else baseTime = 0; - for (int j = 0 ; j < count ; j++) - { - long time; - if (baseTime == 0) time = (long) (workTime.sample() * multiplier); - else time = (long) (baseTime * Math.random()); - if (time < minWorkTime) - time = minWorkTime; - if (time > maxWorkTime) - time = maxWorkTime; - targetTotalElapsed += time; - Future<?> future = executor.submit(new WaitTask(time)); - results.add(new Result(future, System.nanoTime() + time)); - } - long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex]) - + TimeUnit.MILLISECONDS.toNanos(100L); - long now = System.nanoTime(); - if (runs++ > executorCount && now > end) - throw new AssertionError(); - events += results.size(); - pending.add(new Batch(results, end, executorIndex)); -// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start))); - } - } - - public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException - { - // do longer test - new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/utils/LongBTreeTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java deleted file mode 100644 index 76ff2bf..0000000 --- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.utils; - -import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import org.junit.Assert; -import org.junit.Test; - -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Timer; -import com.yammer.metrics.core.TimerContext; -import com.yammer.metrics.stats.Snapshot; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.utils.btree.BTree; -import org.apache.cassandra.utils.btree.BTreeSet; -import org.apache.cassandra.utils.btree.UpdateFunction; - -// TODO : should probably lower fan-factor for tests to make them more intensive -public class LongBTreeTest -{ - - private static final Timer BTREE_TIMER = Metrics.newTimer(BTree.class, "BTREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS); - private static final Timer TREE_TIMER = Metrics.newTimer(BTree.class, "TREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS); - private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY")); - private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE")); - private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f); - - static - { - System.setProperty("cassandra.btree.fanfactor", "4"); - } - - @Test - public void testOversizedMiddleInsert() - { - TreeSet<Integer> canon = new TreeSet<>(); - for (int i = 0 ; i < 10000000 ; i++) - canon.add(i); - Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null); - btree = BTree.update(btree, ICMP, canon, true); - canon.add(Integer.MIN_VALUE); - canon.add(Integer.MAX_VALUE); - Assert.assertTrue(BTree.isWellFormed(btree, ICMP)); - testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator()); - } - - @Test - public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 50, 1, 1, true); - } - - @Test - public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 50, 1, 5, true); - } - - @Test - public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 500, 10, 1, true); - } - - @Test - public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException - { - testInsertions(10000000, 500, 10, 10, true); - } - - @Test - public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException - { - testInsertions(100000000, 5000, 3, 100, true); - } - - @Test - public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException - { - testInsertions(10000, 50, 10, 10, false); - } - - private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException - { - int batchesPerTest = perTestCount / modificationBatchSize; - int maximumRunLength = 100; - int testKeyRange = perTestCount * testKeyRatio; - int tests = totalCount / perTestCount; - System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops", - tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize)); - - // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks - int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2)); - for (int chunk = 0 ; chunk < tests ; chunk += chunkSize) - { - final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>(); - for (int i = 0 ; i < chunkSize ; i++) - { - outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality)); - } - - final List<ListenableFuture<?>> inner = new ArrayList<>(); - int complete = 0; - int reportInterval = totalCount / 100; - int lastReportAt = 0; - for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer) - { - inner.addAll(f.get()); - complete += perTestCount; - if (complete - lastReportAt >= reportInterval) - { - System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount)); - lastReportAt = complete; - } - } - Futures.allAsList(inner).get(); - } - Snapshot snap = BTREE_TIMER.getSnapshot(); - System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); - snap = TREE_TIMER.getSnapshot(); - System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); - System.out.println("Done"); - } - - private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality) - { - ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>() - { - @Override - public List<ListenableFuture<?>> call() - { - final List<ListenableFuture<?>> r = new ArrayList<>(); - NavigableMap<Integer, Integer> canon = new TreeMap<>(); - Object[] btree = BTree.empty(); - final TreeMap<Integer, Integer> buffer = new TreeMap<>(); - final Random rnd = new Random(); - for (int i = 0 ; i < iterations ; i++) - { - buffer.clear(); - int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration); - while (mods > 0) - { - int v = rnd.nextInt(upperBound); - int rc = Math.max(0, Math.min(mods, maxRunLength) - 1); - int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc)); - for (int j = 0 ; j < c ; j++) - { - buffer.put(v, v); - v++; - } - mods -= c; - } - TimerContext ctxt; - ctxt = TREE_TIMER.time(); - canon.putAll(buffer); - ctxt.stop(); - ctxt = BTREE_TIMER.time(); - Object[] next = null; - while (next == null) - next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT); - btree = next; - ctxt.stop(); - - if (!BTree.isWellFormed(btree, ICMP)) - { - System.out.println("ERROR: Not well formed"); - throw new AssertionError("Not well formed!"); - } - if (quickEquality) - testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator()); - else - r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet()))); - } - return r; - } - }); - MODIFY.execute(f); - return f; - } - - @Test - public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException - { - Object[] cur = BTree.empty(); - TreeSet<Integer> canon = new TreeSet<>(); - // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated - for (int i = 0 ; i < 128 ; i++) - { - String id = String.format("[0..%d)", canon.size()); - System.out.println("Testing " + id); - Futures.allAsList(testAllSlices(id, cur, canon)).get(); - Object[] next = null; - while (next == null) - next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT); - cur = next; - canon.add(i); - } - } - - static final Comparator<Integer> ICMP = new Comparator<Integer>() - { - @Override - public int compare(Integer o1, Integer o2) - { - return Integer.compare(o1, o2); - } - }; - - private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon) - { - List<ListenableFuture<?>> waitFor = new ArrayList<>(); - testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor); - testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor); - return waitFor; - } - - private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results) - { - testOneSlice(id, btree, canon, results); - for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending)) - { - // test head/tail sets - testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results); - testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results); - testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results); - testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results); - for (Integer ub : range(canon.size(), lb, ascending)) - { - // test subsets - testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results); - testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results); - testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results); - testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results); - } - } - } - - private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results) - { - ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable() - { - - @Override - public void run() - { - test(id + " Count", test.size(), canon.size()); - testEqual(id, test.iterator(), canon.iterator()); - testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator()); - testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator()); - testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator()); - } - }, null); - results.add(f); - COMPARE.execute(f); - } - - private static void test(String id, int test, int expect) - { - if (test != expect) - { - System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test)); - } - } - - private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon) - { - boolean equal = true; - while (btree.hasNext() && canon.hasNext()) - { - Object i = btree.next(); - Object j = canon.next(); - if (!i.equals(j)) - { - System.out.println(String.format("%s: Expected %d, Got %d", id, j, i)); - equal = false; - } - } - while (btree.hasNext()) - { - System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next())); - equal = false; - } - while (canon.hasNext()) - { - System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next())); - equal = false; - } - if (!equal) - throw new AssertionError("Not equal"); - } - - // should only be called on sets that range from 0->N or N->0 - private static final Iterable<Integer> range(final int size, final int from, final boolean ascending) - { - return new Iterable<Integer>() - { - int cur; - int delta; - int end; - { - if (ascending) - { - end = size + 1; - cur = from == Integer.MIN_VALUE ? -1 : from; - delta = 1; - } - else - { - end = -2; - cur = from == Integer.MIN_VALUE ? size : from; - delta = -1; - } - } - @Override - public Iterator<Integer> iterator() - { - return new Iterator<Integer>() - { - @Override - public boolean hasNext() - { - return cur != end; - } - - @Override - public Integer next() - { - Integer r = cur; - cur += delta; - return r; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - }; - } - - private static final class RandomAbort<V> implements UpdateFunction<V> - { - final Random rnd; - final float chance; - private RandomAbort(Random rnd, float chance) - { - this.rnd = rnd; - this.chance = chance; - } - - public V apply(V replacing, V update) - { - return update; - } - - public boolean abortEarly() - { - return rnd.nextFloat() < chance; - } - - public void allocated(long heapSize) - { - - } - - public V apply(V v) - { - return v; - } - } - -}