Revert "IGNITE-3875: Added separate thread pool for data streamer. This closes #1067."
This reverts commit f597aff1bdf65d3d430cf85c9932391a72c2d7dc. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/442fedc1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/442fedc1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/442fedc1 Branch: refs/heads/master Commit: 442fedc17bdae43b1c87d6bb4680f724a18adb52 Parents: 51cef7c Author: vozerov-gridgain <[email protected]> Authored: Tue Oct 18 12:25:56 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Oct 26 11:11:49 2016 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 31 ------- .../ignite/internal/GridKernalContext.java | 7 -- .../ignite/internal/GridKernalContextImpl.java | 12 --- .../apache/ignite/internal/IgniteKernal.java | 3 - .../org/apache/ignite/internal/IgnitionEx.java | 20 +--- .../managers/communication/GridIoManager.java | 2 - .../managers/communication/GridIoPolicy.java | 3 - .../closure/GridClosureProcessor.java | 3 +- .../datastreamer/DataStreamProcessor.java | 82 ++--------------- .../datastreamer/DataStreamerImpl.java | 31 ++++++- .../internal/processors/pool/PoolProcessor.java | 3 - .../DataStreamProcessorSelfTest.java | 97 -------------------- .../junits/GridTestKernalContext.java | 12 +-- 13 files changed, 44 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index d039584..75145a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -148,9 +148,6 @@ public class IgniteConfiguration { /** Default core size of public thread pool. */ public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT) * 2; - /** Default size of data streamer thread pool. */ - public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT; - /** Default keep alive time for public thread pool. */ @Deprecated public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0; @@ -248,9 +245,6 @@ public class IgniteConfiguration { /** IGFS pool size. */ private int igfsPoolSize = AVAILABLE_PROC_CNT; - /** Data stream pool size. */ - private int dataStreamerPoolSize = DFLT_DATA_STREAMER_POOL_SIZE; - /** Utility cache pool size. */ private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -514,7 +508,6 @@ public class IgniteConfiguration { clockSyncFreq = cfg.getClockSyncFrequency(); clockSyncSamples = cfg.getClockSyncSamples(); consistentId = cfg.getConsistentId(); - dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); failureDetectionTimeout = cfg.getFailureDetectionTimeout(); @@ -796,17 +789,6 @@ public class IgniteConfiguration { } /** - * Size of thread pool that is in charge of processing data stream messages. - * <p> - * If not provided, executor service will have size {@link #DFLT_DATA_STREAMER_POOL_SIZE}. - * - * @return Thread pool size to be used for data stream messages. - */ - public int getDataStreamerThreadPoolSize() { - return dataStreamerPoolSize; - } - - /** * Default size of thread pool that is in charge of processing utility cache messages. * <p> * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}. @@ -930,19 +912,6 @@ public class IgniteConfiguration { } /** - * Set thread pool size that will be used to process data stream messages. - * - * @param poolSize Executor service to use for data stream messages. - * @see IgniteConfiguration#getDataStreamerThreadPoolSize() - * @return {@code this} for chaining. - */ - public IgniteConfiguration setDataStreamerThreadPoolSize(int poolSize) { - dataStreamerPoolSize = poolSize; - - return this; - } - - /** * Sets default thread pool size that will be used to process utility cache messages. * * @param poolSize Default executor service size to use for utility cache messages. http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index adace0b..ae29223 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -533,13 +533,6 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService getIgfsExecutorService(); /** - * Executor service that is in charge of processing data stream messages. - * - * @return Thread pool implementation to be used for data stream messages. - */ - public ExecutorService getDataStreamerExecutorService(); - - /** * Should return an instance of fully configured thread pool to be used for * processing of client messages (REST requests). * http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index fb55800..94c6448 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -312,10 +312,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - private ExecutorService dataStreamExecSvc; - - /** */ - @GridToStringExclude protected ExecutorService restExecSvc; /** */ @@ -388,7 +384,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param p2pExecSvc P2P executor service. * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. - * @param dataStreamExecSvc data stream executor service. * @param restExecSvc REST executor service. * @param affExecSvc Affinity executor service. * @param idxExecSvc Indexing executor service. @@ -408,7 +403,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, - ExecutorService dataStreamExecSvc, ExecutorService restExecSvc, ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, @@ -428,7 +422,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.p2pExecSvc = p2pExecSvc; this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; - this.dataStreamExecSvc = dataStreamExecSvc; this.restExecSvc = restExecSvc; this.affExecSvc = affExecSvc; this.idxExecSvc = idxExecSvc; @@ -970,11 +963,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public ExecutorService getDataStreamerExecutorService() { - return dataStreamExecSvc; - } - - /** {@inheritDoc} */ @Override public ExecutorService getRestExecutorService() { return restExecSvc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 523bee6..1963509 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -666,7 +666,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param p2pExecSvc P2P executor service. * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. - * @param dataStreamExecSvc data stream executor service. * @param restExecSvc Reset executor service. * @param affExecSvc Affinity executor service. * @param idxExecSvc Indexing executor service. @@ -682,7 +681,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, - ExecutorService dataStreamExecSvc, ExecutorService restExecSvc, ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, @@ -790,7 +788,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { p2pExecSvc, mgmtExecSvc, igfsExecSvc, - dataStreamExecSvc, restExecSvc, affExecSvc, idxExecSvc, http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 0653eff..5b2c3fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1468,9 +1468,6 @@ public class IgnitionEx { /** IGFS executor service. */ private ThreadPoolExecutor igfsExecSvc; - /** Data streamer executor service. */ - private ThreadPoolExecutor dataStreamerExecSvc; - /** REST requests executor service. */ private ThreadPoolExecutor restExecSvc; @@ -1693,17 +1690,6 @@ public class IgnitionEx { p2pExecSvc.allowCoreThreadTimeOut(true); - // Note that we do not pre-start threads here as this pool may not be needed. - dataStreamerExecSvc = new IgniteThreadPoolExecutor( - "data-streamer", - cfg.getGridName(), - cfg.getDataStreamerThreadPoolSize(), - cfg.getDataStreamerThreadPoolSize(), - DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); - - dataStreamerExecSvc.allowCoreThreadTimeOut(true); - // Note that we do not pre-start threads here as igfs pool may not be needed. igfsExecSvc = new IgniteThreadPoolExecutor( cfg.getIgfsThreadPoolSize(), @@ -1789,7 +1775,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, dataStreamerExecSvc, restExecSvc, affExecSvc, idxExecSvc, callbackExecSvc, + igfsExecSvc, restExecSvc, affExecSvc, idxExecSvc, callbackExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2407,10 +2393,6 @@ public class IgnitionEx { p2pExecSvc = null; - U.shutdownNow(getClass(), dataStreamerExecSvc, log); - - dataStreamerExecSvc = null; - U.shutdownNow(getClass(), igfsExecSvc, log); igfsExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 77a58d3..3df29cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -84,7 +84,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; @@ -577,7 +576,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case MARSH_CACHE_POOL: case IDX_POOL: case IGFS_POOL: - case DATA_STREAMER_POOL: { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 18235d2..70a7354 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -49,9 +49,6 @@ public class GridIoPolicy { /** Pool for handling distributed index range requests. */ public static final byte IDX_POOL = 8; - /** Data streamer execution pool. */ - public static final byte DATA_STREAMER_POOL = 9; - /** * Defines the range of reserved pools that are not available for plugins. * @param key The key. http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 2c08423..c5a87d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; @@ -972,7 +973,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) { + private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) { try { return callLocal(c, plc); } http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index bd33f62..7663735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -21,12 +21,10 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.DelayQueue; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -37,7 +35,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.stream.StreamReceiver; @@ -45,23 +42,12 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; /** - * Data stream processor. + * */ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { - /** Data streamer separate pool feature major version. */ - private static final int DATA_STREAMER_POOL_MAJOR_VER = 1; - - /** Data streamer separate pool feature minor version. */ - private static final int DATA_STREAMER_POOL_MINOR_VER = 6; - - /** Data streamer separate pool feature maintenance version. */ - private static final int DATA_STREAMER_POOL_MAINTENANCE_VER = 10; - - /** Default pool for data streamer messages processing. */ - public static final byte DFLT_POLICY = GridIoPolicy.PUBLIC_POOL; - /** Loaders map (access is not supposed to be highly concurrent). */ private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); @@ -232,15 +218,13 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer); if (fut != null && !fut.isDone()) { - final byte plc = threadIoPolicy(); - fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { processRequest(nodeId, req); } - }, plc); + }, false); } }); @@ -356,7 +340,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); try { - ctx.io().send(nodeId, resTopic, res, threadIoPolicy()); + Byte plc = GridIoManager.currentPolicy(); + + if (plc == null) + plc = PUBLIC_POOL; + + ctx.io().send(nodeId, resTopic, res, plc); } catch (IgniteCheckedException e) { if (ctx.discovery().alive(nodeId)) @@ -366,59 +355,6 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { } } - /** - * Get IO policy. - * - * @return IO policy. - */ - private static byte threadIoPolicy() { - Byte plc = GridIoManager.currentPolicy(); - - if (plc == null) - plc = DFLT_POLICY; - - return plc; - } - - /** - * Get IO policy for particular node. - * - * @param node Node. - * @return Policy. - */ - public static byte ioPolicy(ClusterNode node) { - assert node != null; - - if (node.isLocal() || node.version().greaterThanEqual( - DATA_STREAMER_POOL_MAJOR_VER, - DATA_STREAMER_POOL_MINOR_VER, - DATA_STREAMER_POOL_MAINTENANCE_VER)) - return GridIoPolicy.DATA_STREAMER_POOL; - else - return DFLT_POLICY; - } - - /** - * Get IO policy for particular node with provided resolver. - * - * @param rslvr Resolver. - * @param node Node. - * @return IO policy. - */ - public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode node) { - assert node != null; - - Byte res = null; - - if (rslvr != null) - res = rslvr.apply(node); - - if (res == null) - res = ioPolicy(node); - - return res; - } - /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 46f6380..c2f226c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; import javax.cache.expiry.ExpiryPolicy; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -104,12 +106,16 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; /** * Data streamer implementation. */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { + /** Default policy reoslver. */ + private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver(); + /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); @@ -120,7 +126,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private byte[] updaterBytes; /** IO policy resovler for data load request. */ - private IgniteClosure<ClusterNode, Byte> ioPlcRslvr; + private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR; /** Max remap count before issuing an error. */ private static final int DFLT_MAX_REMAP_CNT = 32; @@ -1307,12 +1313,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed IgniteInternalFuture<Object> fut; - byte plc = DataStreamProcessor.ioPolicy(ioPlcRslvr, node); + Byte plc = ioPlcRslvr.apply(node); + + if (plc == null) + plc = PUBLIC_POOL; - if (isLocNode) { + if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) { fut = ctx.closure().callLocalSafe( - new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), - plc); + new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false); locFuts.add(fut); @@ -1676,6 +1684,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * Default IO policy resolver. + */ + private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Byte apply(ClusterNode gridNode) { + return PUBLIC_POOL; + } + } + + /** * Key object wrapper. Using identity equals prevents slow down in case of hash code collision. */ private static class KeyCacheObjectWrapper { http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index 26bfc0d..59e5e7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -128,9 +128,6 @@ public class PoolProcessor extends GridProcessorAdapter { return ctx.getIgfsExecutorService(); - case GridIoPolicy.DATA_STREAMER_POOL: - return ctx.getDataStreamerExecutorService(); - default: { if (plc < 0) throw new IgniteCheckedException("Policy cannot be negative: " + plc); http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index 401b09c..9fedc35 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -33,7 +33,6 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; @@ -50,7 +49,6 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; @@ -61,7 +59,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.stream.StreamReceiver; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -952,100 +949,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } /** - * @throws Exception If failed. - */ - public void testLocalDataStreamerDedicatedThreadPool() throws Exception { - try { - useCache = true; - - Ignite ignite = startGrid(1); - - final IgniteCache<String, String> cache = ignite.cache(null); - - IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(null); - try { - ldr.receiver(new StreamReceiver<String, String>() { - @Override public void receive(IgniteCache<String, String> cache, - Collection<Map.Entry<String, String>> entries) throws IgniteException { - String threadName = Thread.currentThread().getName(); - - cache.put("key", threadName); - } - }); - ldr.addData("key", "value"); - - ldr.tryFlush(); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return cache.get("key") != null; - } - }, 3_000); - } - finally { - ldr.close(true); - } - - assertNotNull(cache.get("key")); - - assertTrue(cache.get("key").startsWith("data-streamer")); - - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testRemoteDataStreamerDedicatedThreadPool() throws Exception { - try { - useCache = true; - - Ignite ignite = startGrid(1); - - useCache = false; - - Ignite client = startGrid(0); - - final IgniteCache<String, String> cache = ignite.cache(null); - - IgniteDataStreamer<String, String> ldr = client.dataStreamer(null); - try { - ldr.receiver(new StreamReceiver<String, String>() { - @Override public void receive(IgniteCache<String, String> cache, - Collection<Map.Entry<String, String>> entries) throws IgniteException { - String threadName = Thread.currentThread().getName(); - - cache.put("key", threadName); - } - }); - - ldr.addData("key", "value"); - - ldr.tryFlush(); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return cache.get("key") != null; - } - }, 3_000); - } - finally { - ldr.close(true); - } - - assertNotNull(cache.get("key")); - - assertTrue(cache.get("key").startsWith("data-streamer")); - } - finally { - stopAllGrids(); - } - } - - /** * */ public static class TestObject { http://git-wip-us.apache.org/repos/asf/ignite/blob/442fedc1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index 8cb32b6..f9e2ff4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -63,7 +63,6 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, - null, U.allPluginProviders()); GridTestUtils.setFieldValue(grid(), "cfg", config()); @@ -97,6 +96,11 @@ public class GridTestKernalContext extends GridKernalContextImpl { } } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridTestKernalContext.class, this, super.toString()); + } + /** * Sets system executor service. * @@ -106,6 +110,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { this.sysExecSvc = sysExecSvc; } + /** * Sets executor service. * @@ -114,9 +119,4 @@ public class GridTestKernalContext extends GridKernalContextImpl { public void setExecutorService(ExecutorService execSvc){ this.execSvc = execSvc; } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTestKernalContext.class, this, super.toString()); - } }
