IGNITE-4699: Added custom executors for compute tasls. This closes #1718.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f871b0d7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f871b0d7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f871b0d7 Branch: refs/heads/ignite-5024 Commit: f871b0d77084f4ebf7993eccc9cf59767835a41d Parents: 3eb52a8 Author: tledkov-gridgain <[email protected]> Authored: Fri Apr 21 14:40:22 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Apr 21 14:40:22 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCompute.java | 14 ++ .../configuration/ExecutorConfiguration.java | 115 +++++++++ .../configuration/IgniteConfiguration.java | 30 +++ .../ignite/internal/ExecutorAwareMessage.java | 31 +++ .../ignite/internal/GridJobExecuteRequest.java | 32 ++- .../ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 12 + .../ignite/internal/GridTaskSessionImpl.java | 15 +- .../ignite/internal/IgniteComputeImpl.java | 71 ++++-- .../apache/ignite/internal/IgniteKernal.java | 3 + .../org/apache/ignite/internal/IgnitionEx.java | 66 +++++ .../managers/communication/GridIoManager.java | 23 +- .../managers/communication/GridIoMessage.java | 13 + .../closure/GridClosureProcessor.java | 154 +++++++----- .../processors/job/GridJobProcessor.java | 23 +- .../internal/processors/job/GridJobWorker.java | 15 +- .../internal/processors/pool/PoolProcessor.java | 25 ++ .../session/GridTaskSessionProcessor.java | 10 +- .../processors/task/GridTaskProcessor.java | 69 +++++- .../processors/task/GridTaskWorker.java | 3 +- ...puteCustomExecutorConfigurationSelfTest.java | 85 +++++++ .../IgniteComputeCustomExecutorSelfTest.java | 245 +++++++++++++++++++ .../junits/GridTestKernalContext.java | 5 +- .../testsuites/IgniteComputeGridTestSuite.java | 5 + 24 files changed, 970 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index ad675c0..f0e6039 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -24,6 +24,8 @@ import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.compute.ComputeTaskName; import org.apache.ignite.compute.ComputeTaskSpis; +import org.apache.ignite.configuration.ExecutorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteCallable; @@ -751,4 +753,16 @@ public interface IgniteCompute extends IgniteAsyncSupport { /** {@inheritDoc} */ @Deprecated @Override public IgniteCompute withAsync(); + + /** + * Gets instance of the compute API associated with custom executor. All tasks and closures submitted to returned + * instance will be processed by this executor on both remote and local nodes. If executor with the given name + * doesn't exist, task will be processed in default ("public") pool. + * <p> + * Executor should be defined in {@link IgniteConfiguration#setExecutorConfiguration(ExecutorConfiguration...)}. + * + * @param name Custom executor name. + * @return Instance of compute API associated with custom executor. + */ + public IgniteCompute withExecutor(@NotNull String name); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java new file mode 100644 index 0000000..8ff7932 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.internal.util.typedef.internal.S; + +import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT; + +/** + * Сustom thread pool configuration for compute tasks. See {@link IgniteCompute#withAsync()} for more information. + */ +public class ExecutorConfiguration { + /** Thread pool name. */ + private String name; + + /** Thread pool size. */ + private int size = DFLT_PUBLIC_THREAD_CNT; + + /** + * Default constructor. + */ + public ExecutorConfiguration() { + // No-op. + } + + /** + * Constructor. + * + * @param name Thread pool name. + */ + public ExecutorConfiguration(String name) { + this.name = name; + } + + /** + * Copying constructor. + * + * @param other Instance to copy. + */ + public ExecutorConfiguration(ExecutorConfiguration other) { + assert other != null; + + name = other.name; + size = other.size; + } + + /** + * Get thread pool name. + * <p> + * See {@link #setName(String)} for more information. + * + * @return Executor name. + */ + public String getName() { + return name; + } + + /** + * Set thread pool name. Name cannot be {@code null} and should be unique with respect to other custom executors. + * + * @param name Executor name. + * @return {@code this} for chaining. + */ + public ExecutorConfiguration setName(String name) { + this.name = name; + + return this; + } + + /** + * Get thread pool size. + * <p> + * See {@link #setSize(int)} for more information. + * + * @return Thread pool size. + */ + public int getSize() { + return size; + } + + /** + * Set thread pool size. + * <p> + * Defaults to {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT}. + * + * @param size Thread pool size. + * @return {@code this} for chaining. + */ + public ExecutorConfiguration setSize(int size) { + this.size = size; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ExecutorConfiguration.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index fe08ddf..17927b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoader; import javax.cache.processor.EntryProcessor; import javax.management.MBeanServer; import javax.net.ssl.SSLContext; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.Ignition; @@ -437,6 +438,9 @@ public class IgniteConfiguration { /** */ private BinaryConfiguration binaryCfg; + /** Custom executor configurations. */ + private ExecutorConfiguration[] execCfgs; + /** */ private boolean lateAffAssignment = DFLT_LATE_AFF_ASSIGNMENT; @@ -494,6 +498,7 @@ public class IgniteConfiguration { dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize(); deployMode = cfg.getDeploymentMode(); discoStartupDelay = cfg.getDiscoveryStartupDelay(); + execCfgs = cfg.getExecutorConfiguration(); failureDetectionTimeout = cfg.getFailureDetectionTimeout(); hadoopCfg = cfg.getHadoopConfiguration(); igfsCfg = cfg.getFileSystemConfiguration(); @@ -2658,6 +2663,31 @@ public class IgniteConfiguration { return this; } + /** + * Gets custom executors for user compute tasks. + * <p> + * See {@link #setExecutorConfiguration(ExecutorConfiguration...)} for more information. + * + * @return Executor configurations. + */ + public ExecutorConfiguration[] getExecutorConfiguration() { + return execCfgs; + } + + /** + * Sets custom executors for user compute tasks. + * <p> + * See {@link IgniteCompute#withExecutor(String)} for more information. + * + * @param execCfgs Executor configurations. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setExecutorConfiguration(ExecutorConfiguration... execCfgs) { + this.execCfgs = execCfgs; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java new file mode 100644 index 0000000..a8a3b1a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** + * Message with specified custom executor must be processed in the appropriate thread pool. + */ +public interface ExecutorAwareMessage extends Message { + /** + * @return Custom executor name. {@code null} In case the custom executor is not provided. + */ + @Nullable public String executorName(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index a7e8309..fe2d6d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -41,7 +40,7 @@ import org.jetbrains.annotations.Nullable; /** * Job execution request. */ -public class GridJobExecuteRequest implements Message { +public class GridJobExecuteRequest implements ExecutorAwareMessage { /** */ private static final long serialVersionUID = 0L; @@ -146,6 +145,9 @@ public class GridJobExecuteRequest implements Message { /** */ private AffinityTopologyVersion topVer; + /** */ + private String execName; + /** * No-op constructor to support {@link Externalizable} interface. */ @@ -182,6 +184,7 @@ public class GridJobExecuteRequest implements Message { * @param cacheIds Caches' identifiers to reserve partition. * @param part Partition to lock. * @param topVer Affinity topology version of job mapping. + * @param execName The name of the custom named executor. */ public GridJobExecuteRequest( IgniteUuid sesId, @@ -211,7 +214,8 @@ public class GridJobExecuteRequest implements Message { UUID subjId, @Nullable int[] cacheIds, int part, - @Nullable AffinityTopologyVersion topVer) { + @Nullable AffinityTopologyVersion topVer, + @Nullable String execName) { this.top = top; assert sesId != null; assert jobId != null; @@ -251,6 +255,7 @@ public class GridJobExecuteRequest implements Message { this.idsOfCaches = cacheIds; this.part = part; this.topVer = topVer; + this.execName = execName; this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi; } @@ -454,6 +459,11 @@ public class GridJobExecuteRequest implements Message { return part; } + /** {@inheritDoc} */ + @Override public String executorName() { + return execName; + } + /** * @return Affinity version which was used to map job */ @@ -622,6 +632,12 @@ public class GridJobExecuteRequest implements Message { writer.incrementState(); + case 24: + if (!writer.writeString("executorName", execName)) + return false; + + writer.incrementState(); + } return true; @@ -831,6 +847,14 @@ public class GridJobExecuteRequest implements Message { reader.incrementState(); + case 24: + execName = reader.readString("executorName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridJobExecuteRequest.class); @@ -843,7 +867,7 @@ public class GridJobExecuteRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 24; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 8462e5f..010bd21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -563,6 +563,14 @@ public interface GridKernalContext extends Iterable<GridComponent> { */ public ExecutorService getQueryExecutorService(); + + /** + * Executor services that is in charge of processing user compute task. + * + * @return Map of custom thread pool executors. + */ + @Nullable public Map<String, ? extends ExecutorService> customExecutors(); + /** * Executor service that is in charge of processing schema change messages. * http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 213cf86..bbc9846 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -344,6 +344,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + Map<String, ? extends ExecutorService> customExecSvcs; + + /** */ + @GridToStringExclude private Map<String, Object> attrs = new HashMap<>(); /** */ @@ -401,6 +405,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param callbackExecSvc Callback executor service. * @param qryExecSvc Query executor service. * @param schemaExecSvc Schema executor service. + * @param customExecSvcs Custom named executors. * @param plugins Plugin providers. */ @SuppressWarnings("TypeMayBeWeakened") @@ -424,6 +429,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable IgniteStripedThreadPoolExecutor callbackExecSvc, ExecutorService qryExecSvc, ExecutorService schemaExecSvc, + @Nullable Map<String, ? extends ExecutorService> customExecSvcs, List<PluginProvider> plugins ) { assert grid != null; @@ -448,6 +454,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.callbackExecSvc = callbackExecSvc; this.qryExecSvc = qryExecSvc; this.schemaExecSvc = schemaExecSvc; + this.customExecSvcs = customExecSvcs; marshCtx = new MarshallerContextImpl(plugins); @@ -998,6 +1005,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + public Map<String, ? extends ExecutorService> customExecutors() { + return customExecSvcs; + } + + /** {@inheritDoc} */ @Override public IgniteExceptionRegistry exceptionRegistry() { return IgniteExceptionRegistry.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java index dd1caa1..458ad36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java @@ -114,6 +114,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { /** */ private final IgniteFutureImpl mapFut; + /** */ + private final String execName; + /** * @param taskNodeId Task node ID. * @param taskName Task name. @@ -129,6 +132,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { * @param fullSup Session full support enabled flag. * @param internal Internal task flag. * @param subjId Subject ID. + * @param execName Custom executor name. */ public GridTaskSessionImpl( UUID taskNodeId, @@ -144,7 +148,8 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { GridKernalContext ctx, boolean fullSup, boolean internal, - UUID subjId) { + UUID subjId, + @Nullable String execName) { assert taskNodeId != null; assert taskName != null; assert sesId != null; @@ -173,6 +178,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { this.fullSup = fullSup; this.internal = internal; this.subjId = subjId; + this.execName = execName; mapFut = new IgniteFutureImpl(new GridFutureAdapter()); } @@ -873,6 +879,13 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal { return internal; } + /** + * @return Custom executor name. + */ + @Nullable public String executorName() { + return execName; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridTaskSessionImpl.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index 7499a5d..7ddd4ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -73,6 +73,9 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** */ private UUID subjId; + /** Custom executor name. */ + private String execName; + /** * Required by {@link Externalizable}. */ @@ -103,6 +106,25 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> this.subjId = subjId; } + /** + * Constructor. + * + * @param ctx Kernal context. + * @param prj Projection. + * @param subjId Subject ID. + * @param async Async support flag. + * @param execName Custom executor name. + */ + private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async, + String execName) { + super(async); + + this.ctx = ctx; + this.prj = prj; + this.subjId = subjId; + this.execName = execName; + } + /** {@inheritDoc} */ @Override protected IgniteCompute createAsyncInstance() { return new IgniteComputeImpl(ctx, prj, subjId, true); @@ -152,7 +174,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes()); + return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes(), execName); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -205,7 +227,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes()); + return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes(), execName); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -248,7 +270,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes()); + return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes(), execName); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -298,7 +320,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes()); + return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes(), execName); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -351,7 +373,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key=" + affKey + ']'); - return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes()); + return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes(), execName); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -394,7 +416,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes()); + return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes(), execName); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -437,7 +459,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return ctx.task().execute(taskName, arg); + return ctx.task().execute(taskName, arg, execName); } finally { unguard(); @@ -477,7 +499,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return ctx.task().execute(taskCls, arg); + return ctx.task().execute(taskCls, arg, execName); } finally { unguard(); @@ -516,7 +538,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return ctx.task().execute(task, arg); + return ctx.task().execute(task, arg, execName); } finally { unguard(); @@ -550,7 +572,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().runAsync(BROADCAST, job, prj.nodes()); + return ctx.closure().runAsync(BROADCAST, job, prj.nodes(), execName); } finally { unguard(); @@ -584,7 +606,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes()); + return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes(), execName); } finally { unguard(); @@ -620,7 +642,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().broadcast(job, arg, prj.nodes()); + return ctx.closure().broadcast(job, arg, prj.nodes(), execName); } finally { unguard(); @@ -654,7 +676,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().runAsync(BALANCE, job, prj.nodes()); + return ctx.closure().runAsync(BALANCE, job, prj.nodes(), execName); } finally { unguard(); @@ -689,7 +711,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().runAsync(BALANCE, jobs, prj.nodes()); + return ctx.closure().runAsync(BALANCE, jobs, prj.nodes(), execName); } finally { unguard(); @@ -725,7 +747,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().callAsync(job, arg, prj.nodes()); + return ctx.closure().callAsync(job, arg, prj.nodes(), execName); } finally { unguard(); @@ -759,7 +781,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().callAsync(BALANCE, job, prj.nodes()); + return ctx.closure().callAsync(BALANCE, job, prj.nodes(), execName); } finally { unguard(); @@ -794,7 +816,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes()); + return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes(), execName); } finally { unguard(); @@ -832,7 +854,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().callAsync(job, args, prj.nodes()); + return ctx.closure().callAsync(job, args, prj.nodes(), execName); } finally { unguard(); @@ -870,7 +892,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes()); + return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes(), execName); } finally { unguard(); @@ -911,7 +933,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> guard(); try { - return ctx.closure().callAsync(job, args, rdc, prj.nodes()); + return ctx.closure().callAsync(job, args, rdc, prj.nodes(), execName); } finally { unguard(); @@ -1040,11 +1062,13 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(prj); + out.writeObject(execName); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { prj = (ClusterGroupAdapter)in.readObject(); + execName = (String)in.readObject(); } /** @@ -1054,7 +1078,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> * @throws ObjectStreamException Thrown in case of unmarshalling error. */ protected Object readResolve() throws ObjectStreamException { - return prj.compute(); + return prj.compute().withExecutor(execName); } /** {@inheritDoc} */ @@ -1068,4 +1092,9 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute> @Override public <R> ComputeTaskFuture<R> future() { return (ComputeTaskFuture<R>)super.future(); } + + /** {@inheritDoc} */ + @Override public IgniteCompute withExecutor(@NotNull String name) { + return new IgniteComputeImpl(ctx, prj, subjId, isAsync(), name); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 50f39fa..12a7af6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -699,6 +699,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param callbackExecSvc Callback executor service. * @param qryExecSvc Query executor service. * @param schemaExecSvc Schema executor service. + * @param customExecSvcs Custom named executors. * @param errHnd Error handler to use for notification about startup problems. * @throws IgniteCheckedException Thrown in case of any errors. */ @@ -720,6 +721,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { IgniteStripedThreadPoolExecutor callbackExecSvc, ExecutorService qryExecSvc, ExecutorService schemaExecSvc, + Map<String, ? extends ExecutorService> customExecSvcs, GridAbsClosure errHnd ) throws IgniteCheckedException @@ -835,6 +837,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { callbackExecSvc, qryExecSvc, schemaExecSvc, + customExecSvcs, plugins ); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 2eda01c..4b34891 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -33,6 +33,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -59,6 +60,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.DeploymentMode; +import org.apache.ignite.configuration.ExecutorConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; @@ -1530,6 +1532,9 @@ public class IgnitionEx { /** Query executor service. */ private ThreadPoolExecutor schemaExecSvc; + /** Executor service. */ + private Map<String, ThreadPoolExecutor> customExecSvcs; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1858,6 +1863,24 @@ public class IgnitionEx { schemaExecSvc.allowCoreThreadTimeOut(true); + if (!F.isEmpty(cfg.getExecutorConfiguration())) { + validateCustomExecutorsConfiguration(cfg.getExecutorConfiguration()); + + customExecSvcs = new HashMap<>(); + + for(ExecutorConfiguration execCfg : cfg.getExecutorConfiguration()) { + ThreadPoolExecutor exec = new IgniteThreadPoolExecutor( + execCfg.getName(), + cfg.getIgniteInstanceName(), + execCfg.getSize(), + execCfg.getSize(), + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>()); + + customExecSvcs.put(execCfg.getName(), exec); + } + } + // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1886,6 +1909,7 @@ public class IgnitionEx { callbackExecSvc, qryExecSvc, schemaExecSvc, + customExecSvcs, new CA() { @Override public void apply() { startLatch.countDown(); @@ -1962,6 +1986,30 @@ public class IgnitionEx { } /** + * @param cfgs Array of the executors configurations. + * @throws IgniteCheckedException If configuration is wrong. + */ + private static void validateCustomExecutorsConfiguration(ExecutorConfiguration[] cfgs) + throws IgniteCheckedException { + if (cfgs == null) + return; + + Set<String> names = new HashSet<>(cfgs.length); + + for (ExecutorConfiguration cfg : cfgs) { + if (F.isEmpty(cfg.getName())) + throw new IgniteCheckedException("Custom executor name cannot be null or empty."); + + if (!names.add(cfg.getName())) + throw new IgniteCheckedException("Duplicate custom executor name: " + cfg.getName()); + + if (cfg.getSize() <= 0) + throw new IgniteCheckedException("Custom executor size must be positive [name=" + cfg.getName() + + ", size=" + cfg.getSize() + ']'); + } + } + + /** * @param cfg Ignite configuration copy to. * @return New ignite configuration. * @throws IgniteCheckedException If failed. @@ -2116,6 +2164,17 @@ public class IgnitionEx { initializeDefaultCacheConfiguration(myCfg); + ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration(); + + if (execCfgs != null) { + ExecutorConfiguration[] clone = execCfgs.clone(); + + for (int i = 0; i < execCfgs.length; i++) + clone[i] = new ExecutorConfiguration(execCfgs[i]); + + myCfg.setExecutorConfiguration(clone); + } + if (!myCfg.isClientMode() && myCfg.getMemoryConfiguration() == null) { MemoryConfiguration memCfg = new MemoryConfiguration(); @@ -2522,6 +2581,13 @@ public class IgnitionEx { U.shutdownNow(getClass(), callbackExecSvc, log); callbackExecSvc = null; + + if (!F.isEmpty(customExecSvcs)) { + for (ThreadPoolExecutor exec : customExecSvcs.values()) + U.shutdownNow(getClass(), exec, log); + + customExecSvcs = null; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 83fc3b5..c4f7519 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,6 +66,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -839,12 +841,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } try { + String execName = msg.executorName(); + + if (execName != null) { + Executor exec = pools.customExecutor(execName); + + if (exec != null) { + exec.execute(c); + + return; + } + else { + LT.warn(log, "Custom executor doesn't exist (message will be processed in default " + + "thread pool): " + execName); + } + } + pools.poolForPolicy(plc).execute(c); } catch (RejectedExecutionException e) { - U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " + - "on 'ExecutorService' provided by 'IgniteConfiguration.getPublicThreadPoolSize()'. " + - "Will attempt to process message in the listener thread instead.", e); + U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " + + "message in the listener thread instead.", e); c.run(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 2ad4a0b..16eae26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.managers.communication; import java.io.Externalizable; import java.nio.ByteBuffer; + +import org.apache.ignite.internal.ExecutorAwareMessage; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -26,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; /** * Wrapper for all grid messages. @@ -334,6 +337,16 @@ public class GridIoMessage implements Message { return Integer.MIN_VALUE; } + /** + * @return Executor name (if available). + */ + @Nullable public String executorName() { + if (msg instanceof ExecutorAwareMessage) + return ((ExecutorAwareMessage)msg).executorName(); + + return null; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridIoMessage.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index f91ee34..1051807 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -146,11 +146,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param mode Distribution mode. * @param jobs Closures to execute. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Task execution future. */ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, - @Nullable Collection<ClusterNode> nodes) { - return runAsync(mode, jobs, nodes, false); + @Nullable Collection<ClusterNode> nodes, @Nullable String execName) { + return runAsync(mode, jobs, nodes, false, execName); } /** @@ -158,12 +159,14 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param jobs Closures to execute. * @param nodes Grid nodes. * @param sys If {@code true}, then system pool will be used. + * @param execName Custom executor name. * @return Task execution future. */ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Collection<? extends Runnable> jobs, @Nullable Collection<ClusterNode> nodes, - boolean sys) + boolean sys, + @Nullable String execName) { assert mode != null; assert !F.isEmpty(jobs) : jobs; @@ -181,7 +184,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T1(mode, jobs), null, sys); + return ctx.task().execute(new T1(mode, jobs), null, sys, execName); } finally { busyLock.readUnlock(); @@ -196,7 +199,19 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job, @Nullable Collection<ClusterNode> nodes) { - return runAsync(mode, job, nodes, false); + return runAsync(mode, job, nodes, null); + } + + /** + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @param execName Custom executor name. + * @return Task execution future. + */ + public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job, + @Nullable Collection<ClusterNode> nodes, @Nullable String execName) { + return runAsync(mode, job, nodes, false, execName); } /** @@ -204,12 +219,14 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param job Closure to execute. * @param nodes Grid nodes. * @param sys If {@code true}, then system pool will be used. + * @param execName Custom executor name. * @return Task execution future. */ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job, @Nullable Collection<ClusterNode> nodes, - boolean sys) + boolean sys, + @Nullable String execName) { assert mode != null; assert job != null; @@ -222,7 +239,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T2(mode, job), null, sys); + return ctx.task().execute(new T2(mode, job), null, sys, execName); } finally { busyLock.readUnlock(); @@ -341,6 +358,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param jobs Closures to execute. * @param rdc Reducer. * @param nodes Grid nodes. + * @param execName Custom executor name. * @param <R1> Type. * @param <R2> Type. * @return Reduced result. @@ -348,7 +366,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode, Collection<? extends Callable<R1>> jobs, IgniteReducer<R1, R2> rdc, - @Nullable Collection<ClusterNode> nodes) + @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) { assert mode != null; assert rdc != null; @@ -362,7 +381,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T3<>(mode, jobs, rdc), null); + return ctx.task().execute(new T3<>(mode, jobs, rdc), null, execName); } finally { busyLock.readUnlock(); @@ -380,7 +399,23 @@ public class GridClosureProcessor extends GridProcessorAdapter { GridClosureCallMode mode, @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes) { - return callAsync(mode, jobs, nodes, false); + return callAsync(mode, jobs, nodes, null); + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param nodes Grid nodes. + * @param execName Custom executor name. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> ComputeTaskInternalFuture<Collection<R>> callAsync( + GridClosureCallMode mode, + @Nullable Collection<? extends Callable<R>> jobs, + @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) { + return callAsync(mode, jobs, nodes, false, execName); } /** @@ -388,13 +423,15 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param jobs Closures to execute. * @param nodes Grid nodes. * @param sys If {@code true}, then system pool will be used. + * @param execName Custom executor name. * @param <R> Type. * @return Grid future for collection of closure results. */ public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode, Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, - boolean sys) + boolean sys, + @Nullable String execName) { assert mode != null; assert !F.isEmpty(jobs); @@ -407,7 +444,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T6<>(mode, jobs), null, sys); + return ctx.task().execute(new T6<>(mode, jobs), null, sys, execName); } finally { busyLock.readUnlock(); @@ -415,7 +452,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * * @param mode Distribution mode. * @param job Closure to execute. * @param nodes Grid nodes. @@ -424,7 +460,21 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode, @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) { - return callAsync(mode, job, nodes, false); + return callAsync(mode, job, nodes, null); + } + + /** + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @param execName Custom executor name. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode, + @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) { + return callAsync(mode, job, nodes, false, execName); } /** @@ -432,13 +482,15 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param partId Partition. * @param job Closure to execute. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Grid future for collection of closure results. * @throws IgniteCheckedException If failed. */ public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames, int partId, Callable<R> job, - @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException { + @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) throws IgniteCheckedException { assert partId >= 0 : partId; busyLock.readLock(); @@ -457,7 +509,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null, false); + return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null, + false, execName); } finally { busyLock.readUnlock(); @@ -469,13 +522,15 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param partId Partition. * @param job Job. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Job future. * @throws IgniteCheckedException If failed. */ public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames, int partId, Runnable job, - @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException { + @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) throws IgniteCheckedException { assert partId >= 0 : partId; busyLock.readLock(); @@ -494,7 +549,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null, false); + return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null, + false, execName); } finally { busyLock.readUnlock(); @@ -588,13 +644,15 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param job Closure to execute. * @param nodes Grid nodes. * @param sys If {@code true}, then system pool will be used. + * @param execName Custom executor name. * @param <R> Type. * @return Grid future for collection of closure results. */ public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode, Callable<R> job, @Nullable Collection<ClusterNode> nodes, - boolean sys) + boolean sys, + @Nullable String execName) { assert mode != null; assert job != null; @@ -607,7 +665,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T7<>(mode, job), null, sys); + return ctx.task().execute(new T7<>(mode, job), null, sys, execName); } finally { busyLock.readUnlock(); @@ -618,10 +676,11 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param job Job closure. * @param arg Optional job argument. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Grid future for execution result. */ public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, - @Nullable Collection<ClusterNode> nodes) { + @Nullable Collection<ClusterNode> nodes, @Nullable String execName) { busyLock.readLock(); try { @@ -630,7 +689,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T8(job, arg), null, false); + return ctx.task().execute(new T8(job, arg), null, false, execName); } finally { busyLock.readUnlock(); @@ -641,33 +700,11 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param job Job closure. * @param arg Optional job argument. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Grid future for execution result. */ public <T, R> IgniteInternalFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg, - @Nullable Collection<ClusterNode> nodes) { - busyLock.readLock(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T11<>(job), arg, false); - } - finally { - busyLock.readUnlock(); - } - } - - /** - * @param job Job closure. - * @param arg Optional job argument. - * @param nodes Grid nodes. - * @return Grid future for execution result. - */ - public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg, - @Nullable Collection<ClusterNode> nodes) { + @Nullable Collection<ClusterNode> nodes, @Nullable String execName) { busyLock.readLock(); try { @@ -675,9 +712,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(U.emptyTopologyException()); ctx.task().setThreadContext(TC_SUBGRID, nodes); - ctx.task().setThreadContext(TC_NO_FAILOVER, true); - return ctx.task().execute(new T11<>(job), arg, false); + return ctx.task().execute(new T11<>(job), arg, false, execName); } finally { busyLock.readUnlock(); @@ -688,11 +724,13 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param job Job closure. * @param args Job arguments. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Grid future for execution result. */ public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args, - @Nullable Collection<ClusterNode> nodes) + @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) { busyLock.readLock(); @@ -702,7 +740,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T9<>(job, args), null, false); + return ctx.task().execute(new T9<>(job, args), null, false, execName); } finally { busyLock.readUnlock(); @@ -714,10 +752,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param args Job arguments. * @param rdc Reducer. * @param nodes Grid nodes. + * @param execName Custom executor name. * @return Grid future for execution result. */ public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job, - Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { + Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes, + @Nullable String execName) { busyLock.readLock(); try { @@ -726,7 +766,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T10<>(job, args, rdc), null, false); + return ctx.task().execute(new T10<>(job, args, rdc), null, false, execName); } finally { busyLock.readUnlock(); @@ -1122,7 +1162,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection)}. + * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection,String)}. */ private class T1 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection { /** */ @@ -1156,7 +1196,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection)}. + * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection, String)}. */ private class T2 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection { /** */ @@ -1187,7 +1227,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection)} + * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection, String)} */ private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> implements GridNoImplicitInjection { /** */ @@ -1378,7 +1418,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection)} + * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection, String)} */ private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> implements GridNoImplicitInjection { /** */ @@ -1421,7 +1461,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection)} + * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection, String)} */ private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 91ec8a9..369ca22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -73,6 +74,7 @@ import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -1058,7 +1060,8 @@ public class GridJobProcessor extends GridProcessorAdapter { sesAttrs, req.isSessionFullSupport(), req.isInternal(), - req.getSubjectId()); + req.getSubjectId(), + req.executorName()); taskSes.setCheckpointSpi(req.getCheckpointSpi()); taskSes.setClassLoader(dep.classLoader()); @@ -1098,7 +1101,8 @@ public class GridJobProcessor extends GridProcessorAdapter { evtLsnr, holdLsnr, partsReservation, - req.getTopVer()); + req.getTopVer(), + req.executorName()); jobCtx.job(job); @@ -1274,7 +1278,20 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private boolean executeAsync(GridJobWorker jobWorker) { try { - ctx.getExecutorService().execute(jobWorker); + if (jobWorker.executorName() != null) { + Executor customExec = ctx.pools().customExecutor(jobWorker.executorName()); + + if (customExec != null) + customExec.execute(jobWorker); + else { + LT.warn(log, "Custom executor doesn't exist (local job will be processed in default " + + "thread pool): " + jobWorker.executorName()); + + ctx.getExecutorService().execute(jobWorker); + } + } + else + ctx.getExecutorService().execute(jobWorker); if (metricsUpdateFreq > -1L) startedJobsCnt.increment(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 5c9b9e2..c9129c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -168,6 +168,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { /** Request topology version. */ private final AffinityTopologyVersion reqTopVer; + /** Request topology version. */ + private final String execName; + /** * @param ctx Kernal context. * @param dep Grid deployment. @@ -182,6 +185,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { * @param holdLsnr Hold listener. * @param partsReservation Reserved partitions (must be released at the job finish). * @param reqTopVer Affinity topology version of the job request. + * @param execName Custom executor name. */ GridJobWorker( GridKernalContext ctx, @@ -196,7 +200,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { GridJobEventListener evtLsnr, GridJobHoldListener holdLsnr, GridReservable partsReservation, - AffinityTopologyVersion reqTopVer) { + AffinityTopologyVersion reqTopVer, + String execName) { super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class)); assert ctx != null; @@ -219,6 +224,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { this.holdLsnr = holdLsnr; this.partsReservation = partsReservation; this.reqTopVer = reqTopVer; + this.execName = execName; if (job != null) this.job = job; @@ -727,6 +733,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { } /** + * @return Custom executor name. + */ + public String executorName() { + return execName; + } + + /** * @param evtType Event type. * @param msg Message. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index 37bbb54..221c7bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.pool; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridKernalContext; @@ -26,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.plugin.extensions.communication.IoPool; +import org.jetbrains.annotations.Nullable; /** * Processor which abstracts out thread pool management. @@ -34,6 +37,9 @@ public class PoolProcessor extends GridProcessorAdapter { /** Map of {@link IoPool}-s injected by Ignite plugins. */ private final IoPool[] extPools = new IoPool[128]; + /** Custom named pools. */ + private final Map<String, ? extends ExecutorService> customExecs; + /** * Constructor. * @@ -72,6 +78,8 @@ public class PoolProcessor extends GridProcessorAdapter { } } } + + customExecs = ctx.customExecutors(); } /** {@inheritDoc} */ @@ -165,4 +173,21 @@ public class PoolProcessor extends GridProcessorAdapter { } } } + + /** + * Gets executor service for custom policy by executor name. + * + * @param name Executor name. + * @return Executor service. + */ + @Nullable public Executor customExecutor(String name) { + assert name != null; + + Executor exec = null; + + if (customExecs != null) + exec = customExecs.get(name); + + return exec; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java index 9af038a..f9937a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java @@ -76,6 +76,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { * @param fullSup {@code True} to enable distributed session attributes and checkpoints. * @param internal {@code True} in case of internal task. * @param subjId Subject ID. + * @param execName Custom executor name. * @return New session if one did not exist, or existing one. */ public GridTaskSessionImpl createTaskSession( @@ -91,7 +92,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { Map<Object, Object> attrs, boolean fullSup, boolean internal, - UUID subjId) { + UUID subjId, + @Nullable String execName) { if (!fullSup) { return new GridTaskSessionImpl( taskNodeId, @@ -107,7 +109,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { ctx, false, internal, - subjId); + subjId, + execName); } while (true) { @@ -130,7 +133,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter { ctx, true, internal, - subjId)); + subjId, + execName)); if (old != null) ses = old; http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index d34f297..22d5716 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -359,6 +359,19 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <R> Task return value type. */ public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { + return execute(taskCls, arg, null); + } + + /** + * @param taskCls Task class. + * @param arg Optional execution argument. + * @param execName Name of the custom executor. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg, + @Nullable String execName) { assert taskCls != null; lock.readLock(); @@ -367,7 +380,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { if (stopping) throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskCls); - return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false); + return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, + false, execName); } finally { lock.readUnlock(); @@ -382,7 +396,19 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <R> Task return value type. */ public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) { - return execute(task, arg, false); + return execute(task, arg, false, null); + } + + /** + * @param task Actual task. + * @param arg Optional task argument. + * @param execName Name of the custom executor. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, String execName) { + return execute(task, arg, false, execName); } /** @@ -394,13 +420,28 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <R> Task return value type. */ public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) { + return execute(task, arg, sys, null); + } + + /** + * @param task Actual task. + * @param arg Optional task argument. + * @param sys If {@code true}, then system pool will be used. + * @param execName Name of the custom executor. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys, + @Nullable String execName) { lock.readLock(); try { if (stopping) throw new IllegalStateException("Failed to execute task due to grid shutdown: " + task); - return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg, sys); + return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg, + sys, execName); } finally { lock.readUnlock(); @@ -436,6 +477,18 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param <R> Task return value type. */ public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg) { + return execute(taskName, arg, null); + } + + /** + * @param taskName Task name. + * @param arg Optional execution argument. + * @param execName Name of the custom executor. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg, @Nullable String execName) { assert taskName != null; lock.readLock(); @@ -444,7 +497,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { if (stopping) throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskName); - return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false); + return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, + false, execName); } finally { lock.readUnlock(); @@ -458,6 +512,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { * @param sesId Task session ID. * @param arg Optional task argument. * @param sys If {@code true}, then system pool will be used. + * @param execName Name of the custom executor. * @return Task future. */ @SuppressWarnings("unchecked") @@ -467,7 +522,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { @Nullable ComputeTask<T, R> task, IgniteUuid sesId, @Nullable T arg, - boolean sys) { + boolean sys, + @Nullable String execName) { assert sesId != null; String taskClsName; @@ -629,7 +685,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { Collections.emptyMap(), fullSup, internal, - subjId); + subjId, + execName); ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index cb5aabe..62224f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1372,7 +1372,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { subjId, affCacheIds, affPartId, - mapTopVer); + mapTopVer, + ses.executorName()); if (loc) ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java new file mode 100644 index 0000000..2277100 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compute; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.ExecutorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests custom executor configuration. + */ +public class IgniteComputeCustomExecutorConfigurationSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testConfigurations() throws Exception { + try { + checkStartWithInvalidConfiguration(getConfiguration("node0") + .setExecutorConfiguration(new ExecutorConfiguration())); + + checkStartWithInvalidConfiguration(getConfiguration("node0") + .setExecutorConfiguration(new ExecutorConfiguration(""))); + + checkStartWithInvalidConfiguration(getConfiguration("node0") + .setExecutorConfiguration(new ExecutorConfiguration("exec").setSize(-1))); + + checkStartWithInvalidConfiguration(getConfiguration("node0") + .setExecutorConfiguration(new ExecutorConfiguration("exec").setSize(0))); + } + finally { + Ignition.stopAll(true); + } + } + + /** + * @param cfg Ignite configuration. + * @throws Exception If failed. + */ + private void checkStartWithInvalidConfiguration(IgniteConfiguration cfg) throws Exception { + try { + Ignition.start(cfg); + + fail("Node start must fail."); + } + catch (IgniteException e) { + // No-op. + } + } +}
