http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java index ea9531a..1c258a4 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java @@ -29,7 +29,7 @@ import org.yardstickframework.BenchmarkConfiguration; /** * Ignite benchmark that performs put and query operations. */ -public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark { +public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { /** {@inheritDoc} */ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { super.setUp(cfg); @@ -81,4 +81,4 @@ public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark { @Override protected IgniteCache<Integer, Object> cache() { return ignite().cache("query"); } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java new file mode 100644 index 0000000..c0567ef --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.yardstickframework.BenchmarkConfiguration; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client + * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the + * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared + * to the values in the cache. + */ +public class IgniteAtomicInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Set> { + /** */ + private final ConcurrentMap<String, AtomicLong> nextValMap = new ConcurrentHashMap<>(); + + /** */ + private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); + + /** */ + private volatile Exception ex; + + /** {@inheritDoc} */ + @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + Thread thread = new Thread(new Runnable() { + @Override public void run() { + try { + final int timeout = args.cacheOperationTimeoutMillis(); + final int range = args.range(); + + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000); + + rwl.writeLock().lock(); + + try { + println("Start cache validation."); + + long startTime = U.currentTimeMillis(); + + Map<String, Set> badCacheEntries = new HashMap<>(); + + for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet()) { + String key = e.getKey(); + + asyncCache.get(key); + Set set = asyncCache.<Set>future().get(timeout); + + if (set == null || e.getValue() == null || !Objects.equals(e.getValue().get(), (long)set.size())) + badCacheEntries.put(key, set); + } + + if (!badCacheEntries.isEmpty()) { + // Print all usefull information and finish. + for (Map.Entry<String, Set> e : badCacheEntries.entrySet()) { + String key = e.getKey(); + + println("Got unexpected set size [key='" + key + "', expSize=" + nextValMap.get(key) + + ", cacheVal=" + e.getValue() + "]"); + } + + println("Next values map contant:"); + for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet()) + println("Map Entry [key=" + e.getKey() + ", val=" + e.getValue() + "]"); + + println("Cache content:"); + + for (int k2 = 0; k2 < range; k2++) { + String key2 = "key-" + k2; + + asyncCache.get(key2); + Object val = asyncCache.future().get(timeout); + + if (val != null) + println("Cache Entry [key=" + key2 + ", val=" + val + "]"); + + } + + throw new IllegalStateException("Cache and local map are in inconsistent state " + + "[badKeys=" + badCacheEntries.keySet() + ']'); + } + + println("Clearing all data."); + + asyncCache.removeAll(); + asyncCache.future().get(timeout); + + nextValMap.clear(); + + println("Cache validation successfully finished in " + + (U.currentTimeMillis() - startTime) / 1000 + " sec."); + } + finally { + rwl.writeLock().unlock(); + } + } + } + catch (Throwable e) { + ex = new Exception(e); + + println("Got exception: " + e); + + e.printStackTrace(); + + if (e instanceof Error) + throw (Error)e; + } + } + }, "cache-" + cacheName() + "-validator"); + + thread.setDaemon(true); + + thread.start(); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + final int k = nextRandom(args.range()); + + String key = "key-" + k; + + rwl.readLock().lock(); + + try { + if (ex != null) + throw ex; + + AtomicLong nextAtomicVal = nextValMap.putIfAbsent(key, new AtomicLong(1)); + + Long nextVal = 1L; + + if (nextAtomicVal != null) + nextVal = nextAtomicVal.incrementAndGet(); + + asyncCache.invoke(key, new AddInSetEntryProcessor(), nextVal); + asyncCache.future().get(args.cacheOperationTimeoutMillis()); + } + finally { + rwl.readLock().unlock(); + } + + if (ex != null) + throw ex; + + return true; + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "atomic-invoke-retry"; + } + + /** + */ + private static class AddInSetEntryProcessor implements CacheEntryProcessor<String, Set, Object> { + /** */ + private static final long serialVersionUID = 0; + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<String, Set> entry, + Object... arguments) throws EntryProcessorException { + assert !F.isEmpty(arguments); + + Object val = arguments[0]; + + Set set; + + if (!entry.exists()) + set = new HashSet<>(); + else + set = entry.getValue(); + + set.add(val); + + entry.setValue(set); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java new file mode 100644 index 0000000..c8b0b1d --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client + * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the + * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared + * to the values in the cache. + */ +public class IgniteAtomicOffHeapInvokeRetryBenchmark extends IgniteAtomicInvokeRetryBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "atomic-offheap-invoke-retry"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java new file mode 100644 index 0000000..ebb9eac --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Atomic retries failover benchmark. + * <p> + * Client generates continuous load to the cluster (random get, put, invoke, remove + * operations). + */ +public class IgniteAtomicOffHeapRetriesBenchmark extends IgniteAtomicRetriesBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "atomic-offheap-reties"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java new file mode 100644 index 0000000..4e60698 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.Map; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.cache.CacheEntryProcessor; + +/** + * Atomic retries failover benchmark. + * <p> + * Client generates continuous load to the cluster (random get, put, invoke, remove + * operations). + */ +public class IgniteAtomicRetriesBenchmark extends IgniteFailoverAbstractBenchmark<Integer, String> { + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + final int key = nextRandom(args.range()); + + int opNum = nextRandom(4); + + final int timeout = args.cacheOperationTimeoutMillis(); + + switch (opNum) { + case 0: + asyncCache.get(key); + asyncCache.future().get(timeout); + + break; + + case 1: + asyncCache.put(key, String.valueOf(key)); + asyncCache.future().get(timeout); + + break; + + case 2: + asyncCache.invoke(key, new TestCacheEntryProcessor()); + asyncCache.future().get(timeout); + + break; + + case 3: + asyncCache.remove(key); + asyncCache.future().get(timeout); + + break; + + default: + throw new IllegalStateException("Got invalid operation number: " + opNum); + } + + return true; + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "atomic-reties"; + } + + /** + */ + private static class TestCacheEntryProcessor implements CacheEntryProcessor<Integer, String, String> { + /** Serial version uid. */ + private static final long serialVersionUID = 0; + + /** {@inheritDoc} */ + @Override public String process(MutableEntry<Integer, String> entry, + Object... arguments) throws EntryProcessorException { + return "key"; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java new file mode 100644 index 0000000..83fc58f --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.mxbean.IgniteMXBean; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.yardstick.cache.IgniteCacheAbstractBenchmark; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkUtils; +import org.yardstickframework.BenchmarkUtils.ProcessExecutionResult; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Ignite benchmark that performs long running failover tasks. + */ +public abstract class IgniteFailoverAbstractBenchmark<K, V> extends IgniteCacheAbstractBenchmark<K, V> { + /** */ + private static final AtomicBoolean restarterStarted = new AtomicBoolean(); + + /** Async Cache. */ + protected IgniteCache<K, V> asyncCache; + + /** */ + private final AtomicBoolean firtsExProcessed = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + asyncCache = cache.withAsync(); + } + + /** {@inheritDoc} */ + @Override public void onWarmupFinished() { + if (cfg.memberId() == 0 && restarterStarted.compareAndSet(false, true)) { + Thread restarterThread = new Thread(new Runnable() { + @Override public void run() { + try { + println("Servers restarter started on driver: " + + IgniteFailoverAbstractBenchmark.this.getClass().getSimpleName()); + + Ignite ignite = ignite(); + + // Read servers configs from cache to local map. + IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite. + getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>(). + setName("serversConfigs")); + + final Map<Integer, BenchmarkConfiguration> srvsCfgs = new HashMap<>(); + + for (Cache.Entry<Integer, BenchmarkConfiguration> e : srvsCfgsCache) { + println("Read entry from 'serversConfigs' cache : " + e); + + srvsCfgs.put(e.getKey(), e.getValue()); + } + + assert ignite.cluster().forServers().nodes().size() == srvsCfgs.size(); + + final int backupsCnt = args.backups(); + + assert backupsCnt >= 1 : "Backups: " + backupsCnt; + + final boolean isDebug = ignite.log().isDebugEnabled(); + + // Main logic. + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(args.restartDelay() * 1000); + + int numNodesToRestart = nextRandom(1, backupsCnt + 1); + + List<Integer> ids = new ArrayList<>(); + + ids.addAll(srvsCfgs.keySet()); + + Collections.shuffle(ids); + + println("Waiting for partitioned map exchage of all nodes"); + + IgniteCompute asyncCompute = ignite.compute().withAsync(); + + asyncCompute.broadcast(new AwaitPartitionMapExchangeTask()); + + asyncCompute.future().get(args.cacheOperationTimeoutMillis()); + + println("Start servers restarting [numNodesToRestart=" + numNodesToRestart + + ", shuffledIds=" + ids + "]"); + + for (int i = 0; i < numNodesToRestart; i++) { + Integer id = ids.get(i); + + BenchmarkConfiguration bc = srvsCfgs.get(id); + + ProcessExecutionResult res = BenchmarkUtils.kill9Server(bc, isDebug); + + println("Server with id " + id + " has been killed." + + (isDebug ? " Process execution result:\n" + res : "")); + } + + Thread.sleep(args.restartSleep() * 1000); + + for (int i = 0; i < numNodesToRestart; i++) { + Integer id = ids.get(i); + + BenchmarkConfiguration bc = srvsCfgs.get(id); + + ProcessExecutionResult res = BenchmarkUtils.startServer(bc, isDebug); + + println("Server with id " + id + " has been started." + + (isDebug ? " Process execution result:\n" + res : "")); + } + } + } + catch (Throwable e) { + println("Got exception: " + e); + e.printStackTrace(); + + U.dumpThreads(null); + + if (e instanceof Error) + throw (Error)e; + } + } + }, "servers-restarter"); + + restarterThread.setDaemon(true); + restarterThread.start(); + } + } + + /** + * Awaits for partitiona map exchage. + * + * @param ignite Ignite. + * @throws Exception If failed. + */ + @SuppressWarnings("BusyWait") + protected static void awaitPartitionMapExchange(Ignite ignite) throws Exception { + IgniteLogger log = ignite.log(); + + log.info("Waiting for finishing of a partition exchange on node: " + ignite); + + IgniteKernal kernal = (IgniteKernal)ignite; + + while (true) { + boolean partitionsExchangeFinished = true; + + for (IgniteInternalCache<?, ?> cache : kernal.cachesx(null)) { + log.info("Checking cache: " + cache.name()); + + GridCacheAdapter<?, ?> c = kernal.internalCache(cache.name()); + + if (!(c instanceof GridDhtCacheAdapter)) + break; + + GridDhtCacheAdapter<?, ?> dht = (GridDhtCacheAdapter<?, ?>)c; + + GridDhtPartitionFullMap partMap = dht.topology().partitionMap(true); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + log.info("Checking node: " + e.getKey()); + + for (Map.Entry<Integer, GridDhtPartitionState> e1 : e.getValue().entrySet()) { + if (e1.getValue() != GridDhtPartitionState.OWNING) { + log.info("Undesired state [id=" + e1.getKey() + ", state=" + e1.getValue() + ']'); + + partitionsExchangeFinished = false; + + break; + } + } + + if (!partitionsExchangeFinished) + break; + } + + if (!partitionsExchangeFinished) + break; + } + + if (partitionsExchangeFinished) + return; + + Thread.sleep(100); + } + } + + /** {@inheritDoc} */ + @Override public void onException(Throwable e) { + // Proceess only the first exception to prevent a multiple printing of a full thread dump. + if (firtsExProcessed.compareAndSet(false, true)) { + // Debug info on current client. + println("Full thread dump of the current node below."); + + U.dumpThreads(null); + + println(""); + + ((IgniteMXBean)ignite()).dumpDebugInfo(); + + // Debug info on servers. + Ignite ignite = ignite(); + + ClusterGroup srvs = ignite.cluster().forServers(); + + IgniteCompute asyncCompute = ignite.compute(srvs).withAsync(); + + asyncCompute.broadcast(new ThreadDumpPrinterTask(ignite.cluster().localNode().id(), e)); + asyncCompute.future().get(10_000); + } + } + + /** + * @return Cache name. + */ + protected abstract String cacheName(); + + /** {@inheritDoc} */ + @Override protected IgniteCache<K, V> cache() { + return ignite().cache(cacheName()); + } + + /** + */ + private static class ThreadDumpPrinterTask implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final UUID id; + + /** */ + private final Throwable e; + + /** + * @param id Benchmark node id. + * @param e Exception. + */ + ThreadDumpPrinterTask(UUID id, Throwable e) { + this.id = id; + this.e = e; + } + + /** {@inheritDoc} */ + @Override public void run() { + println("Driver finished with exception [driverNodeId=" + id + ", e=" + e + "]"); + println("Full thread dump of the current server node below."); + + U.dumpThreads(null); + + println(""); + + ((IgniteMXBean)ignite).dumpDebugInfo(); + } + } + + /** + */ + private static class AwaitPartitionMapExchangeTask implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void run() { + try { + awaitPartitionMapExchange(ignite); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java new file mode 100644 index 0000000..29405de --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.yardstick.IgniteNode; +import org.yardstickframework.BenchmarkConfiguration; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Ignite failover node. + */ +public class IgniteFailoverNode extends IgniteNode { + /** {@inheritDoc} */ + @Override public void start(BenchmarkConfiguration cfg) throws Exception { + super.start(cfg); + + // Put server configuration at special cache. + RuntimeMXBean mxBean = ManagementFactory.getRuntimeMXBean(); + + List<String> jvmOpts = mxBean.getInputArguments(); + + StringBuilder jvmOptsStr = new StringBuilder(); + + for (String opt : jvmOpts) + jvmOptsStr.append(opt).append(' '); + + cfg.customProperties().put("JVM_OPTS", jvmOptsStr.toString()); + cfg.customProperties().put("PROPS_ENV", System.getenv("PROPS_ENV")); + cfg.customProperties().put("CLASSPATH", mxBean.getClassPath()); + cfg.customProperties().put("JAVA", System.getenv("JAVA")); + + IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite(). + getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>().setName("serversConfigs")); + + srvsCfgsCache.put(cfg.memberId(), cfg); + + println("Put at cache [" + cfg.memberId() + "=" + cfg + "]"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java new file mode 100644 index 0000000..f8a1689 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.yardstickframework.BenchmarkConfiguration; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client + * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the + * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared + * to the values in the cache. + */ +public class IgniteTransactionalInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> { + /** */ + private final ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>(); + + /** */ + private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); + + /** */ + private volatile Exception ex; + + /** {@inheritDoc} */ + @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + Thread thread = new Thread(new Runnable() { + @Override public void run() { + try { + final int timeout = args.cacheOperationTimeoutMillis(); + final int keysCnt = args.keysCount(); + + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000); + + rwl.writeLock().lock(); + + try { + println("Start cache validation."); + + long startTime = U.currentTimeMillis(); + + Map<String, Long> notEqualsCacheVals = new HashMap<>(); + Map<String, Long> notEqualsLocMapVals = new HashMap<>(); + + for (int k = 0; k < args.range(); k++) { + if (k % 10_000 == 0) + println("Start validation for keys like 'key-" + k + "-*'"); + + for (int i = 0; i < keysCnt; i++) { + String key = "key-" + k + "-" + cfg.memberId() + "-" + i; + + asyncCache.get(key); + Long cacheVal = asyncCache.<Long>future().get(timeout); + + AtomicLong aVal = map.get(key); + Long mapVal = aVal != null ? aVal.get() : null; + + if (!Objects.equals(cacheVal, mapVal)) { + notEqualsCacheVals.put(key, cacheVal); + notEqualsLocMapVals.put(key, mapVal); + } + } + } + + assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " + + "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]"; + + if (!notEqualsCacheVals.isEmpty()) { + // Print all usefull information and finish. + for (Map.Entry<String, Long> eLocMap : notEqualsLocMapVals.entrySet()) { + String key = eLocMap.getKey(); + Long mapVal = eLocMap.getValue(); + Long cacheVal = notEqualsCacheVals.get(key); + + println(cfg, "Got different values [key='" + key + + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]"); + } + + println(cfg, "Local driver map contant:\n " + map); + + println(cfg, "Cache content:"); + + for (int k2 = 0; k2 < args.range(); k2++) { + for (int i2 = 0; i2 < keysCnt; i2++) { + String key2 = "key-" + k2 + "-" + cfg.memberId() + "-" + i2; + + asyncCache.get(key2); + Long val = asyncCache.<Long>future().get(timeout); + + if (val != null) + println(cfg, "Entry [key=" + key2 + ", val=" + val + "]"); + } + } + + throw new IllegalStateException("Cache and local map are in inconsistent state."); + } + + println("Cache validation successfully finished in " + + (U.currentTimeMillis() - startTime) / 1000 + " sec."); + } + finally { + rwl.writeLock().unlock(); + } + } + } + catch (Throwable e) { + ex = new Exception(e); + + println("Got exception: " + e); + + e.printStackTrace(); + + if (e instanceof Error) + throw (Error)e; + } + } + }, "cache-" + cacheName() + "-validator"); + + thread.setDaemon(true); + + thread.start(); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + final int k = nextRandom(args.range()); + + final String[] keys = new String[args.keysCount()]; + + assert keys.length > 0 : "Count of keys: " + keys.length; + + for (int i = 0; i < keys.length; i++) + keys[i] = "key-" + k + "-" + cfg.memberId() + "-" + i; + + for (String key : keys) { + rwl.readLock().lock(); + + try { + if (ex != null) + throw ex; + + asyncCache.invoke(key, new IncrementCacheEntryProcessor()); + asyncCache.future().get(args.cacheOperationTimeoutMillis()); + + AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0)); + + if (prevVal != null) + prevVal.incrementAndGet(); + } + finally { + rwl.readLock().unlock(); + } + } + + if (ex != null) + throw ex; + + return true; + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-invoke-retry"; + } + + /** + */ + private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Long> { + /** */ + private static final long serialVersionUID = 0; + + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<String, Long> entry, + Object... arguments) throws EntryProcessorException { + long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1; + + entry.setValue(newVal); + + return newVal; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java new file mode 100644 index 0000000..4cbcf67 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Invoke retry failover benchmark. + * <p> + * Each client maintains a local map that it updates together with cache. + * Client invokes an increment closure for all generated keys and atomically increments value for corresponding + * keys in the local map. To validate cache contents, all writes from the client are stopped, values in + * the local map are compared to the values in the cache. + */ +public class IgniteTransactionalOffHeapInvokeRetryBenchmark extends IgniteTransactionalInvokeRetryBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-offheap-invoke-retry"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java new file mode 100644 index 0000000..7fa2d1a --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Transactional write invoke failover benchmark. + * <p> + * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master', + * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction + * and randomly chooses between read and write scenarios: + * <ul> + * <li>Reads value associated with the master key and child keys. Values must be equal.</li> + * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment + * closure on child keys. No validation is performed.</li> + * </ul> + */ +public class IgniteTransactionalOffHeapWriteInvokeBenchmark extends IgniteTransactionalWriteInvokeBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-offheap-write-invoke"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java new file mode 100644 index 0000000..bdecca7 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Transactional write read failover benchmark. + * <p> + * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1', + * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with + * each key. Values must be equal. Client increments value by 1, commits the transaction. + */ +public class IgniteTransactionalOffHeapWriteReadBenchmark extends IgniteTransactionalWriteReadBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-offheap-write-read"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java new file mode 100644 index 0000000..1a8ee14 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.cache.CacheException; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Transactional write invoke failover benchmark. + * <p> + * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master', + * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction + * and randomly chooses between read and write scenarios: + * <ul> + * <li>Reads value associated with the master key and child keys. Values must be equal.</li> + * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment + * closure on child keys. No validation is performed.</li> + * </ul> + */ +public class IgniteTransactionalWriteInvokeBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> { + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + final int k = nextRandom(args.range()); + + assert args.keysCount() > 0 : "Count of keys: " + args.keysCount(); + + final String[] keys = new String[args.keysCount()]; + + final String masterKey = "key-" + k + "-master"; + + for (int i = 0; i < keys.length; i++) + keys[i] = "key-" + k + "-" + i; + + final int scenario = nextRandom(2); + + return doInTransaction(ignite(), new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + final int timeout = args.cacheOperationTimeoutMillis(); + + switch (scenario) { + case 0: // Read scenario. + Map<String, Long> map = new HashMap<>(); + + asyncCache.get(masterKey); + Long cacheVal = asyncCache.<Long>future().get(timeout); + + map.put(masterKey, cacheVal); + + for (String key : keys) { + asyncCache.get(key); + cacheVal = asyncCache.<Long>future().get(timeout); + + map.put(key, cacheVal); + } + + Set<Long> values = new HashSet<>(map.values()); + + if (values.size() != 1) { + // Print all usefull information and finish. + println(cfg, "Got different values for keys [map=" + map + "]"); + + println(cfg, "Cache content:"); + + for (int k = 0; k < args.range(); k++) { + for (int i = 0; i < args.keysCount(); i++) { + String key = "key-" + k + "-" + i; + + asyncCache.get(key); + Long val = asyncCache.<Long>future().get(timeout); + + if (val != null) + println(cfg, "Entry [key=" + key + ", val=" + val + "]"); + } + } + + throw new IllegalStateException("Found different values for keys (see above information)."); + } + + break; + case 1: // Invoke scenario. + asyncCache.get(masterKey); + Long val = asyncCache.<Long>future().get(timeout); + + asyncCache.put(masterKey, val == null ? 0 : val + 1); + asyncCache.future().get(timeout); + + for (String key : keys) { + asyncCache.invoke(key, new IncrementCacheEntryProcessor()); + asyncCache.future().get(timeout); + } + + break; + } + + return true; + } + }); + } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception + */ + public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-write-invoke"; + } + + /** + */ + private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Void> { + /** */ + private static final long serialVersionUID = 0; + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<String, Long> entry, + Object... arguments) throws EntryProcessorException { + entry.setValue(entry.getValue() == null ? 0 : entry.getValue() + 1); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java new file mode 100644 index 0000000..c962749 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Transactional write read failover benchmark. + * <p> + * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1', + * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with + * each key. Values must be equal. Client increments value by 1, commits the transaction. + */ +public class IgniteTransactionalWriteReadBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> { + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + final int k = nextRandom(args.range()); + + assert args.keysCount() > 0 : "Count of keys: " + args.keysCount(); + + final String[] keys = new String[args.keysCount()]; + + for (int i = 0; i < keys.length; i++) + keys[i] = "key-" + k + "-" + i; + + return doInTransaction(ignite(), new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + Map<String, Long> map = new HashMap<>(); + + final int timeout = args.cacheOperationTimeoutMillis(); + + for (String key : keys) { + asyncCache.get(key); + Long val = asyncCache.<Long>future().get(timeout); + + map.put(key, val); + } + + Set<Long> values = new HashSet<>(map.values()); + + if (values.size() != 1) { + // Print all usefull information and finish. + println(cfg, "Got different values for keys [map=" + map + "]"); + + println(cfg, "Cache content:"); + + for (int k = 0; k < args.range(); k++) { + for (int i = 0; i < args.keysCount(); i++) { + String key = "key-" + k + "-" + i; + + asyncCache.get(key); + Long val = asyncCache.<Long>future().get(timeout); + + if (val != null) + println(cfg, "Entry [key=" + key + ", val=" + val + "]"); + } + } + + throw new IllegalStateException("Found different values for keys (see above information)."); + } + + final Long oldVal = map.get(keys[0]); + + final Long newVal = oldVal == null ? 0 : oldVal + 1; + + for (String key : keys) { + asyncCache.put(key, newVal); + asyncCache.future().get(timeout); + } + + return true; + } + }); + } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception + */ + public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-write-read"; + } +}