Repository: ignite Updated Branches: refs/heads/master 2b5f78f20 -> 45698810a
IGNITE-6699: Optimize client-side data streamer performance. - Fixes #3442. Signed-off-by: Nikolay Izhikov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/45698810 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/45698810 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/45698810 Branch: refs/heads/master Commit: 45698810a996e839bc288bd64870f7f1659682b3 Parents: 2b5f78f Author: Fedotov <[email protected]> Authored: Thu May 3 12:30:49 2018 +0300 Committer: Nikolay Izhikov <[email protected]> Committed: Thu May 3 12:32:55 2018 +0300 ---------------------------------------------------------------------- .../streamer/JmhStreamerAddDataBenchmark.java | 221 +++++++++++++++++ .../org/apache/ignite/IgniteDataStreamer.java | 17 ++ .../datastreamer/DataStreamerImpl.java | 237 ++++++++++++------- .../DataStreamProcessorSelfTest.java | 1 + .../datastreamer/DataStreamerImplSelfTest.java | 6 + .../datastreamer/DataStreamerTimeoutTest.java | 1 + .../ApiParity/StreamerParityTest.cs | 3 +- 7 files changed, 400 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java new file mode 100644 index 0000000..f9c42b1 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.streamer; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.Ignition; +import org.apache.ignite.logger.NullLogger; +import org.jetbrains.annotations.NotNull; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * DataStreamerImpl.addData(Collection) vs DataStreamerImpl.addData(Key, Value). + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, jvmArgsAppend = {"-Xms1g", "-Xmx3g", "-server", "-XX:+AggressiveOpts", "-XX:MaxMetaspaceSize=256m"}) +@Measurement(iterations = 11) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Threads(1) +@Warmup(iterations = 21) +public class JmhStreamerAddDataBenchmark { + /** Data amount. */ + private static final int DATA_AMOUNT = 512; + + /** Ignite client instance. */ + private static final String IGNITE_CLIENT_INSTANCE_NAME = "client"; + + /** Ignite client cache name. */ + private static final String IGNITE_CLIENT_CACHE_NAME = "clientCache"; + + /** + * Create Ignite configuration. + * + * @return Ignite configuration. + */ + private IgniteConfiguration getConfiguration(String cfgName, boolean isClient) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridLogger(new NullLogger()); + + cfg.setIgniteInstanceName(cfgName); + + if (isClient) { + cfg.setClientMode(true); + + cfg.setCacheConfiguration(defaultCacheConfiguration(IGNITE_CLIENT_CACHE_NAME)); + } + else + cfg.setCacheConfiguration(defaultCacheConfiguration("server")); + + return cfg; + } + + /** + * Create cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration defaultCacheConfiguration(String cacheName) { + CacheConfiguration cfg = new CacheConfiguration(cacheName); + + cfg.setWriteSynchronizationMode(writeSynchronizationMode()); + + return cfg; + } + + /** + * Stop all grids after tests. + */ + @TearDown(Level.Trial) + public void tearDown() { + Ignition.stopAll(true); + } + + /** + * Start 2 servers and 1 client. + */ + @Setup(Level.Trial) + public void setup() { + int cnt = gridCnt(); + + for (int i = 0; i < cnt; i++) + Ignition.start(getConfiguration("srv" + i, false)); + + Ignition.start(getConfiguration(IGNITE_CLIENT_INSTANCE_NAME, true)); + } + + /** + * Prepare and clean collection with streaming data. + */ + @State(Scope.Thread) + public static class StreamingData { + /** + * Collection that will be streamed from client. + */ + Collection<AbstractMap.SimpleEntry<Integer, Integer>> streamingCol = new ArrayList<>(DATA_AMOUNT); + + /** + * Prepare collection. + */ + @Setup(Level.Iteration) + public void prepareCollection() { + for (int i = 0; i < DATA_AMOUNT; i++) + streamingCol.add(new HashMap.SimpleEntry<>(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); + } + + /** + * Clean collection after each test. + */ + @TearDown(Level.Iteration) + public void cleanCollection() { + streamingCol.clear(); + } + } + + /** + * Create streamer on client cache. + */ + @State(Scope.Benchmark) + public static class DataStreamer { + /** Data loader. */ + final IgniteDataStreamer<Integer, Integer> dataLdr = + Ignition.ignite(IGNITE_CLIENT_INSTANCE_NAME).dataStreamer(IGNITE_CLIENT_CACHE_NAME); + } + + /** + * Test addData(Collection). + * + * @param data Data that will be streamed. + * @param streamer Data loader. + */ + @Benchmark + public void addDataCollection(StreamingData data, DataStreamer streamer) { + streamer.dataLdr.addData(data.streamingCol); + } + + /** + * Test addData(Key, Value). + * + * @param data Data that will be streamed. + * @param streamer Data loader. + */ + @Benchmark + public void addDataKeyValue(StreamingData data, DataStreamer streamer) { + for (Map.Entry<Integer, Integer> entry : data.streamingCol) + streamer.dataLdr.addData(entry.getKey(), entry.getValue()); + } + + /** + * @return Synchronization mode. + */ + @NotNull protected CacheWriteSynchronizationMode writeSynchronizationMode() { + return FULL_SYNC; + } + + /** + * @return Node amount. + */ + protected int gridCnt() { + return 3; + } + + /** + * Run benchmark. + * + * @param args Args. + */ + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder() + .include(JmhStreamerAddDataBenchmark.class.getSimpleName()) + .build(); + + new Runner(options).run(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index b1f5851..8b40bfa 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -119,6 +119,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { /** Default operations batch size to sent to remote node for loading. */ public static final int DFLT_PER_NODE_BUFFER_SIZE = 512; + /** Default batch size per thread to send to buffer on node. */ + public static final int DFLT_PER_THREAD_BUFFER_SIZE = 4096; + /** Default timeout for streamer's operations. */ public static final long DFLT_UNLIMIT_TIMEOUT = -1; @@ -225,6 +228,20 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void perNodeParallelOperations(int parallelOps); /** + * Allows to set buffer size for thread in case of stream by {@link #addData(Object, Object)} call. + * + * @param size Size of buffer. + */ + public void perThreadBufferSize(int size); + + /** + * Gets buffer size set by {@link #perThreadBufferSize(int)}. + * + * @return Buffer size. + */ + public int perThreadBufferSize(); + + /** * Sets the timeout that is used in the following cases: * <ul> * <li>any data addition method can be blocked when all per node parallel operations are exhausted. http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 8cad342..070a0da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -93,7 +93,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; @@ -102,6 +102,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -125,6 +126,16 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { + /** Per thread buffer size. */ + private int bufLdrSzPerThread = DFLT_PER_THREAD_BUFFER_SIZE; + + /** + * Thread buffer map: on each thread there are future and list of entries which will be streamed after filling + * thread batch. + */ + private final Map<Long, T2<IgniteCacheFutureImpl, List<DataStreamerEntry>>> threadBufMap = + new ConcurrentHashMap<>(); + /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); @@ -238,7 +249,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private final IgniteFuture<?> publicFut; /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); /** */ private CacheException disconnectErr; @@ -371,6 +382,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed ensureCacheStarted(); } + /** {@inheritDoc} */ + public void perThreadBufferSize(int size) { + bufLdrSzPerThread = size; + } + + /** {@inheritDoc} */ + public int perThreadBufferSize() { + return bufLdrSzPerThread; + } + /** * @param c Closure to run. * @param topVer Topology version to wait for. @@ -404,27 +425,35 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** - * Enters busy lock. + * Acquires read or write lock. + * + * @param writeLock {@code True} if acquires write lock. */ - private void enterBusy() { - if (!busyLock.enterBusy()) { + private void lock(boolean writeLock) { + if (writeLock) + busyLock.writeLock(); + else + busyLock.readLock(); + + if (closed.get() || cancelled) { + unlock(writeLock); + if (disconnectErr != null) throw disconnectErr; closedException(); } - else if (cancelled) { - busyLock.leaveBusy(); - - closedException(); - } } /** - * Leaves busy lock. + * Read or write unlock. + * @param writeLock {@code True} if write unlock. */ - private void leaveBusy() { - busyLock.leaveBusy(); + private void unlock(boolean writeLock) { + if (writeLock) + busyLock.writeUnlock(); + else + busyLock.readUnlock(); } /** {@inheritDoc} */ @@ -563,39 +592,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed checkSecurityPermission(SecurityPermission.CACHE_PUT); - enterBusy(); - - try { - GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); - - resFut.listen(rmvActiveFut); + Collection<DataStreamerEntry> batch = new ArrayList<>(entries.size()); - activeFuts.add(resFut); + for (Map.Entry<K, V> entry : entries) { + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true); + CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, entry.getValue(), true); - Collection<KeyCacheObjectWrapper> keys = - new GridConcurrentHashSet<>(entries.size()); - - Collection<DataStreamerEntry> entries0 = new ArrayList<>(entries.size()); - - for (Map.Entry<K, V> entry : entries) { - KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true); - CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, entry.getValue(), true); - - keys.add(new KeyCacheObjectWrapper(key)); - - entries0.add(new DataStreamerEntry(key, val)); - } - - load0(entries0, resFut, keys, 0); - - return new IgniteCacheFutureImpl<>(resFut); - } - catch (IgniteDataStreamerTimeoutException e) { - throw e; - } - finally { - leaveBusy(); + batch.add(new DataStreamerEntry(key, val)); } + + return addDataInternal(batch); } /** @@ -620,39 +626,79 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @return Future. */ public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries) { - enterBusy(); + IgniteCacheFutureImpl fut = null; + + GridFutureAdapter internalFut = null; + + List entriesList; - GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(); + lock(false); try { - resFut.listen(rmvActiveFut); + long threadId = Thread.currentThread().getId(); + + T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> futAndEntries = threadBufMap.get(threadId); + + if (futAndEntries == null) { + internalFut = new GridFutureAdapter(); - activeFuts.add(resFut); + fut = new IgniteCacheFutureImpl(internalFut); - Collection<KeyCacheObjectWrapper> keys = null; + internalFut.listen(rmvActiveFut); - if (entries.size() > 1) { - keys = new GridConcurrentHashSet<>(entries.size()); + activeFuts.add(internalFut); + + // Initial capacity should be more than batch by 12.5% in order to avoid resizing. + futAndEntries = new T2(fut, new ArrayList<>(bufLdrSzPerThread + (bufLdrSzPerThread >> 3))); + + threadBufMap.put(threadId, futAndEntries); + } + else { + fut = futAndEntries.get1(); - for (DataStreamerEntry entry : entries) - keys.add(new KeyCacheObjectWrapper(entry.getKey())); + internalFut = (GridFutureAdapter)fut.internalFuture(); } - load0(entries, resFut, keys, 0); + entriesList = futAndEntries.get2(); - return new IgniteCacheFutureImpl<>(resFut); + entriesList.addAll(entries); + + if (entriesList.size() >= bufLdrSzPerThread) { + loadData(entriesList, internalFut); + + threadBufMap.remove(threadId); + } + + return fut; } catch (Throwable e) { - resFut.onDone(e); + if (internalFut != null) + internalFut.onDone(e); if (e instanceof Error || e instanceof IgniteDataStreamerTimeoutException) throw e; - return new IgniteCacheFutureImpl<>(resFut); + return fut; } finally { - leaveBusy(); + unlock(false); + } + } + + /** + * Load thread batch of DataStreamerEntry. + */ + private void loadData(Collection<? extends DataStreamerEntry> entries, GridFutureAdapter fut) { + Collection<KeyCacheObjectWrapper> keys = null; + + if (entries.size() > 1) { + keys = new GridConcurrentHashSet<>(entries.size()); + + for (DataStreamerEntry e : entries) + keys.add(new KeyCacheObjectWrapper(e.getKey())); } + + load0(entries, fut, keys, 0); } /** {@inheritDoc} */ @@ -1043,6 +1089,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed int doneCnt = 0; + flushAllThreadsBufs(); + for (IgniteInternalFuture<?> f : activeFuts) { if (!f.isDone()) { if (activeFuts0 == null) @@ -1148,7 +1196,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") @Override public void flush() throws CacheException { - enterBusy(); + lock(true); try { doFlush(); @@ -1157,7 +1205,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed throw CU.convertToCacheException(e); } finally { - leaveBusy(); + unlock(true); } } @@ -1169,10 +1217,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * should be called periodically. */ @Override public void tryFlush() throws IgniteInterruptedException { - if (!busyLock.enterBusy()) + if (!busyLock.tryWriteLock()) return; try { + flushAllThreadsBufs(); + for (Buffer buf : bufMappings.values()) buf.flush(); @@ -1182,11 +1232,23 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed throw GridCacheUtils.convertToCacheException(e); } finally { - leaveBusy(); + unlock(true); } } /** + * + */ + private void flushAllThreadsBufs() { + assert busyLock.writeLockedByCurrentThread(); + + for (T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> val : threadBufMap.values()) + loadData(val.get2(), (GridFutureAdapter)val.get1().internalFuture()); + + threadBufMap.clear(); + } + + /** * @param cancel {@code True} to close with cancellation. * @throws CacheException If failed. */ @@ -1219,39 +1281,44 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (!closed.compareAndSet(false, true)) return null; - busyLock.block(); - - if (log.isDebugEnabled()) - log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']'); + busyLock.writeLock(); try { - // Assuming that no methods are called on this loader after this method is called. - if (cancel) { - cancelled = true; + if (log.isDebugEnabled()) + log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']'); - for (Buffer buf : bufMappings.values()) - buf.cancelAll(err); - } - else - doFlush(); + try { + // Assuming that no methods are called on this loader after this method is called. + if (cancel) { + cancelled = true; - ctx.event().removeLocalEventListener(discoLsnr); + for (Buffer buf : bufMappings.values()) + buf.cancelAll(err); + } + else + doFlush(); - ctx.io().removeMessageListener(topic); - } - catch (IgniteCheckedException | IgniteDataStreamerTimeoutException e) { - fut.onDone(e); - throw e; - } + ctx.event().removeLocalEventListener(discoLsnr); - long failed = failCntr.longValue(); + ctx.io().removeMessageListener(topic); + } + catch (IgniteCheckedException | IgniteDataStreamerTimeoutException e) { + fut.onDone(e); + throw e; + } - if (failed > 0 && err == null) - err = new IgniteCheckedException("Some of DataStreamer operations failed [failedCount=" + failed + "]"); + long failed = failCntr.longValue(); - fut.onDone(err); + if (failed > 0 && err == null) + err = new IgniteCheckedException("Some of DataStreamer operations failed [failedCount=" + failed + "]"); - return err; + fut.onDone(err); + + return err; + } + finally { + busyLock.writeUnlock(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index ac89021..91345fe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -466,6 +466,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { ldr.receiver(DataStreamerCacheUpdaters.<Integer, Integer>individual()); ldr.perNodeBufferSize(2); + ldr.perThreadBufferSize(1); // Define count of puts. final AtomicInteger idxGen = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index d277b2e..d02f466 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -238,6 +238,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { IgniteFuture fut = null; try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { + streamer.perThreadBufferSize(1); + fut = streamer.addData(1, "1"); streamer.flush(); @@ -334,6 +336,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { final IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME); + ldr.perThreadBufferSize(1); + final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { @Override public void run() { try { @@ -467,6 +471,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME); + streamer.perThreadBufferSize(1); + ((DataStreamerImpl)streamer).maxRemapCount(0); streamer.addData(1, 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java index 314855e..6e88adf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java @@ -142,6 +142,7 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) { ldr.timeout(TIMEOUT); ldr.receiver(new TestDataReceiver()); + ldr.perThreadBufferSize(1); ldr.perNodeBufferSize(1); ldr.perNodeParallelOperations(1); ((DataStreamerImpl)ldr).maxRemapCount(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs index 8e795e5..9af0561 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs @@ -29,7 +29,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity /** Members that are not needed on .NET side. */ private static readonly string[] UnneededMembers = { - "deployClass" + "deployClass", + "perThreadBufferSize" }; /** Known name mappings. */
