ignite-5658 review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9936dd85 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9936dd85 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9936dd85 Branch: refs/heads/ignite-5658 Commit: 9936dd850e1caa022064cef16e680e32f1f2007d Parents: 269ca02 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Thu Jul 20 15:30:52 2017 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Thu Jul 20 15:30:52 2017 +0300 ---------------------------------------------------------------------- .../yardstick/IgniteBenchmarkArguments.java | 11 + .../IgniteSingleCacheStreamerBenchmark.java | 209 +++++++++++++++++++ .../cache/IgniteStreamerBenchmark.java | 13 +- 3 files changed, 231 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java index 594fa1f..355b1b0 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java @@ -252,6 +252,10 @@ public class IgniteBenchmarkArguments { @Parameter(names = {"-stbs", "--streamerBufSize"}, description = "Data streamer buffer size") private int streamerBufSize = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE; + /** */ + @Parameter(names = {"-stpo", "--streamerParallelOps"}, description = "Data streamer max parallel ops") + private int streamerPerNodeParallelOps = IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS; + /** * @return {@code True} if need set {@link PersistentStoreConfiguration}. */ @@ -631,6 +635,13 @@ public class IgniteBenchmarkArguments { return streamerBufSize; } + /** + * @return Streamer per node parallel ops. + */ + public int getStreamerPerNodeParallelOps() { + return streamerPerNodeParallelOps; + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteBenchmarkArguments.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java new file mode 100644 index 0000000..1a35c5d --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.yardstick.IgniteAbstractBenchmark; +import org.apache.ignite.yardstick.cache.model.SampleValue; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkUtils; + +/** + * + */ +public class IgniteSingleCacheStreamerBenchmark extends IgniteAbstractBenchmark { + /** */ + private List<String> cacheNames; + + /** */ + private ExecutorService executor; + + /** */ + private int entries; + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + entries = args.range(); + + if (entries <= 0) + throw new IllegalArgumentException("Invalid number of entries: " + entries); + + if (cfg.threads() != 1) + throw new IllegalArgumentException("IgniteStreamerBenchmark should be run with single thread. " + + "Internally it starts multiple threads."); + + String cacheNamePrefix = args.streamerCachesPrefix(); + + if (cacheNamePrefix == null || cacheNamePrefix.isEmpty()) + throw new IllegalArgumentException("Streamer caches prefix not set."); + + List<String> caches = new ArrayList<>(); + + for (String cacheName : ignite().cacheNames()) { + if (cacheName.startsWith(cacheNamePrefix)) + caches.add(cacheName); + } + + if (caches.isEmpty()) + throw new IllegalArgumentException("Failed to find for IgniteStreamerBenchmark caches " + + "starting with '" + cacheNamePrefix + "'"); + + BenchmarkUtils.println("Found " + caches.size() + " caches for IgniteStreamerBenchmark: " + caches); + + if (args.streamerCacheIndex() >= caches.size()) { + throw new IllegalArgumentException("Invalid streamer cache index: " + args.streamerCacheIndex() + + ", there are only " + caches.size() + " caches."); + } + + if (args.streamerCacheIndex() + args.streamerConcurrentCaches() > caches.size()) { + throw new IllegalArgumentException("There are no enough caches [cacheIndex=" + args.streamerCacheIndex() + + ", concurrentCaches=" + args.streamerConcurrentCaches() + + ", totalCaches=" + caches.size() + ']'); + } + + Collections.sort(caches); + + cacheNames = new ArrayList<>(caches.subList(args.streamerCacheIndex(), + args.streamerCacheIndex() + args.streamerConcurrentCaches())); + + if (cacheNames.size() > 1) + throw new IllegalArgumentException("IgniteSingleCacheStreamerBenchmark can run only with single cache " + + "[cacheNames=" + cacheNames + ']'); + + executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + + BenchmarkUtils.println("IgniteStreamerBenchmark start [cacheIndex=" + args.streamerCacheIndex() + + ", concurrentCaches=" + args.streamerConcurrentCaches() + + ", entries=" + entries + + ", bufferSize=" + args.streamerBufferSize() + + ", cachesToUse=" + cacheNames + ']'); + + if (cfg.warmup() > 0) + throw new IllegalArgumentException("IgniteSingleCacheStreamerBenchmark can run only without warmup " + + "[warmup=" + cfg.warmup() + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> map) throws Exception { + BenchmarkUtils.println("IgniteStreamerBenchmark start test."); + + long start = System.currentTimeMillis(); + + final AtomicBoolean stop = new AtomicBoolean(); + + final String cacheName = cacheNames.get(0); + + try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) { + streamer.perNodeParallelOperations(args.getStreamerPerNodeParallelOps()); + streamer.perNodeBufferSize(args.streamerBufferSize()); + + final List<Future<Void>> futs = new ArrayList<>(); + + int availableCpus = Runtime.getRuntime().availableProcessors(); + + final AtomicInteger cnt = new AtomicInteger(); + final int delta = entries / availableCpus; + + for (int i = 0; i < availableCpus; i++) { + futs.add(executor.submit( + new Callable<Void>() { + @Override public Void call() throws Exception { + int min = cnt.getAndAdd(delta); + int max = min + delta; + + long start = System.currentTimeMillis(); + + BenchmarkUtils.println("IgniteStreamerBenchmark start load cache " + + "[name=" + cacheName + + ", min=" + min + + ", max=" + max + ']'); + + for (int i = 0; i < delta; i++) { + streamer.addData(min + i, new SampleValue(min + i)); + + if (i > 0 && i % 1000 == 0) { + if (stop.get()) + break; + + if (i % 100_000 == 0) { + BenchmarkUtils.println("IgniteStreamerBenchmark cache load progress " + + "[name=" + cacheName + + ", entries=" + i + + ", delta=" + delta + + ", timeMillis=" + (System.currentTimeMillis() - start) + ']'); + } + } + } + + long time = System.currentTimeMillis() - start; + + BenchmarkUtils.println("Thread finished loading cache [name=" + cacheName + + ", min=" + min + + ", max=" + max + + ", bufferSize=" + args.streamerBufferSize() + + ", totalTimeMillis=" + time + ']'); + + return null; + } + } + )); + } + + for (Future<Void> fut : futs) + fut.get(); + } + finally { + stop.set(true); + } + + long time = System.currentTimeMillis() - start; + + BenchmarkUtils.println("IgniteStreamerBenchmark finished [totalTimeMillis=" + time + + ", entries=" + entries + + ", bufferSize=" + args.streamerBufferSize() + ']'); + + BenchmarkUtils.println("Cache size [cacheName=" + cacheName + + ", size=" + ignite().cache(cacheName).size() + ']'); + + return false; + } + + /** {@inheritDoc} */ + @Override public void tearDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java index 9e253e1..9914eac 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java @@ -25,6 +25,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.yardstick.IgniteAbstractBenchmark; @@ -175,6 +176,11 @@ public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark { BenchmarkUtils.println("IgniteStreamerBenchmark start load cache [name=" + cacheName + ']'); try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) { + streamer.perNodeParallelOperations(Runtime.getRuntime().availableProcessors() * 4); + streamer.perNodeBufferSize(args.streamerBufferSize()); + + BenchmarkUtils.println("Data streamer: " + streamer); + for (int i = 0; i < entries; i++) { streamer.addData(i, new SampleValue(i)); @@ -226,8 +232,11 @@ public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark { /** {@inheritDoc} */ @Override public void tearDown() throws Exception { - if (executor != null) - executor.shutdown(); + if (executor != null) { + executor.shutdownNow(); + + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } super.tearDown(); }