Repository: ignite Updated Branches: refs/heads/master 8436ae006 -> 18ca0b231
IGNITE-6051 Improve future listeners model in DataStreamerImpl Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18ca0b23 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18ca0b23 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18ca0b23 Branch: refs/heads/master Commit: 18ca0b2318c8014b0a3e63729d0256de1b11485e Parents: 8436ae0 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Mon Aug 21 17:28:25 2017 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Mon Aug 21 17:28:25 2017 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerImpl.java | 63 ++++++-- .../util/future/GridCompoundFuture.java | 40 ++++-- .../datastreamer/DataStreamerImplSelfTest.java | 143 ++++++++++++++++++- 3 files changed, 221 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/18ca0b23/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index c11288c..aee0d26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -941,10 +941,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } }; + GridCompoundFuture opFut = new SilentCompoundFuture(); + + opFut.listen(lsnr); + final List<GridFutureAdapter<?>> futs; try { - futs = buf.update(entriesForNode, topVer, lsnr, remap); + futs = buf.update(entriesForNode, topVer, opFut, remap); + + opFut.markInitialized(); } catch (IgniteInterruptedCheckedException e1) { resFut.onDone(e1); @@ -1382,7 +1388,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param newEntries Infos. * @param topVer Topology version. - * @param lsnr Listener for the operation future. + * @param opFut Completion future for the operation. * @param remap Remapping flag. * @return Future for operation. * @throws IgniteInterruptedCheckedException If failed. @@ -1390,10 +1396,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed @Nullable List<GridFutureAdapter<?>> update( Iterable<DataStreamerEntry> newEntries, AffinityTopologyVersion topVer, - IgniteInClosure<IgniteInternalFuture<?>> lsnr, + GridCompoundFuture opFut, boolean remap ) throws IgniteInterruptedCheckedException { List<GridFutureAdapter<?>> res = null; + GridFutureAdapter[] futs = new GridFutureAdapter[stripes.length]; for (DataStreamerEntry entry : newEntries) { List<DataStreamerEntry> entries0 = null; @@ -1408,9 +1415,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed synchronized (b) { curFut0 = b.curFut; - // Listener should be added only once per whole entries collection. - // Should we simplify the model and get rid of all futures? - curFut0.listen(lsnr); + if (futs[b.partId] != curFut0) { + opFut.add(curFut0); + + if (res == null) + res = new ArrayList<>(); + + res.add(curFut0); + + futs[b.partId] = curFut0; + } if (b.batchTopVer == null) b.batchTopVer = topVer; @@ -1426,14 +1440,28 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } } - if (res == null) - res = new ArrayList<>(); + if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) { + for (int i = 0; i < stripes.length; i++) { + PerStripeBuffer b0 = stripes[i]; - res.add(curFut0); + // renew all stale versions + synchronized (b0) { + // Another thread might already renew the batch + AffinityTopologyVersion bTopVer = b0.batchTopVer; - if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) { - b.renewBatch(remap); + if(bTopVer != null && topVer.compareTo(bTopVer) > 0) { + GridFutureAdapter<Object> bFut = b0.curFut; + + b0.renewBatch(remap); + bFut.onDone(null, + new IgniteCheckedException("Topology changed during batch preparation " + + "[batchTopVer=" + bTopVer + ", topVer=" + topVer + "]")); + } + } + } + + // double check, it's possible that current future was already overwritten on buffer overflow curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." + "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]")); } @@ -2182,4 +2210,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed return S.toString(PerStripeBuffer.class, this, super.toString()); } } + + /** */ + private static final class SilentCompoundFuture<T,R> extends GridCompoundFuture<T,R> { + /** {@inheritDoc} */ + @Override protected void logError(IgniteLogger log, String msg, Throwable e) { + // no-op + } + + /** {@inheritDoc} */ + @Override protected void logDebug(IgniteLogger log, String msg) { + // no-op + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/18ca0b23/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 74a8f41..80cf67b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -96,13 +96,13 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig onDone(rdc.reduce()); } catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); // Exception in reducer is a bug, so we bypass checkComplete here. onDone(e); } catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); // Bypass checkComplete because need to rethrow. onDone(e); @@ -117,25 +117,21 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig } catch (IgniteCheckedException e) { if (!ignoreFailure(e)) { - if (e instanceof NodeStoppingException) { - IgniteLogger log = logger(); - - if (log != null && log.isDebugEnabled()) - log.debug("Failed to execute compound future reducer, node stopped."); - } + if (e instanceof NodeStoppingException) + logDebug(logger(), "Failed to execute compound future reducer, node stopped."); else - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); onDone(e); } } catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); onDone(e); } catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); // Bypass checkComplete because need to rethrow. onDone(e); @@ -278,12 +274,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig onDone(rdc != null ? rdc.reduce() : null); } catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); onDone(e); } catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + logError(null, "Failed to execute compound future reducer: " + this, e); onDone(e); @@ -293,6 +289,24 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig } /** + * @param log IgniteLogger. + * @param msg ShortMessage. + * @param e Exception. + */ + protected void logError(IgniteLogger log, String msg, Throwable e) { + U.error(log, msg, e); + } + + /** + * @param log IgniteLogger. + * @param msg ShortMessage. + */ + protected void logDebug(IgniteLogger log, String msg) { + if (log != null && log.isDebugEnabled()) + log.debug(msg); + } + + /** * Returns future at the specified position in this list. * * @param idx - index index of the element to return http://git-wip-us.apache.org/repos/asf/ignite/blob/18ca0b23/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index 6d3466b..e90f6b0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -18,10 +18,17 @@ package org.apache.ignite.internal.processors.datastreamer; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -31,11 +38,13 @@ import org.apache.ignite.cache.CacheServerNotFoundException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -45,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.TransactionException; import org.apache.log4j.Appender; import org.apache.log4j.Logger; import org.apache.log4j.SimpleLayout; @@ -105,6 +115,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { + cnt = 0; + startGrids(5); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -252,6 +264,135 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testAllOperationFinishedBeforeFutureCompletion() throws Exception { + cnt = 0; + + Ignite ignite = startGrids(MAX_CACHE_COUNT); + + final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Throwable> ex = new AtomicReference<>(); + + Collection<Map.Entry> entries = new ArrayList<>(100); + + for (int i = 0; i < 100; i++) + entries.add(new IgniteBiTuple<>(i, "" + i)); + + IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME); + + ldr.addData(entries).listen(new IgniteInClosure<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> future) { + try { + future.get(); + + for (int i = 0; i < 100; i++) + assertEquals("" + i, cache.get(i)); + } + catch (Throwable e) { + ex.set(e); + } + + latch.countDown(); + } + }); + + ldr.tryFlush(); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + Throwable e = ex.get(); + + if(e != null) { + if(e instanceof Error) + throw (Error) e; + + if(e instanceof RuntimeException) + throw (RuntimeException) e; + + throw new RuntimeException(e); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemapOnTopologyChangeDuringUpdatePreparation() throws Exception { + cnt = 0; + + Ignite ignite = startGrids(MAX_CACHE_COUNT); + + final int threads = 8; + final int entries = threads * 10000; + final long timeout = 10000; + + final CountDownLatch l1 = new CountDownLatch(threads); + final CountDownLatch l2 = new CountDownLatch(1); + final AtomicInteger cntr = new AtomicInteger(); + + final AtomicReference<Throwable> ex = new AtomicReference<>(); + + final IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME); + + final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + int i = cntr.getAndIncrement(); + + for (int j = 0; i < (entries >> 1); i += threads) { + ldr.addData(i, i); + + if(j++ % 1000 == 0) + ldr.tryFlush(); + } + + l1.countDown(); + + assertTrue(l2.await(timeout, TimeUnit.MILLISECONDS)); + + for (int j = 0; i < entries; i += threads) { + ldr.addData(i, i); + + if(j++ % 1000 == 0) + ldr.tryFlush(); + } + } + catch (Throwable e) { + ex.compareAndSet(null, e); + } + } + }, threads, "loader"); + + assertTrue(l1.await(timeout, TimeUnit.MILLISECONDS)); + + stopGrid(MAX_CACHE_COUNT - 1); + + l2.countDown(); + + fut.get(timeout); + + ldr.close(); + + Throwable e = ex.get(); + + if(e != null) { + if(e instanceof Error) + throw (Error) e; + + if(e instanceof RuntimeException) + throw (RuntimeException) e; + + throw new RuntimeException(e); + } + + IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); + + for(int i = 0; i < entries; i++) + assertEquals(i, cache.get(i)); + } + + /** * Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and * no error logged to the console. * @@ -366,4 +507,4 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { super.sendMessage(node, msg, ackC); } } -} \ No newline at end of file +}