Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 739c606aa -> aa46bc7c8


IGNITE-4105: Added separate thread pool for queries. This closes #1469.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa46bc7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa46bc7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa46bc7c

Branch: refs/heads/ignite-2.0
Commit: aa46bc7c82e7f172bcceed4afa4b010bf4071cda
Parents: 739c606
Author: Sergey Kalashnikov <skalashni...@gridgain.com>
Authored: Fri Feb 3 12:09:27 2017 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Feb 3 12:09:27 2017 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  31 ++++++
 .../ignite/internal/GridKernalContext.java      |   7 ++
 .../ignite/internal/GridKernalContextImpl.java  |  13 +++
 .../apache/ignite/internal/IgniteKernal.java    |  17 ++-
 .../org/apache/ignite/internal/IgnitionEx.java  |  20 ++++
 .../managers/communication/GridIoManager.java   |   4 +-
 .../managers/communication/GridIoPolicy.java    |   3 +
 .../internal/processors/pool/PoolProcessor.java |   5 +
 .../junits/GridTestKernalContext.java           |   1 +
 .../query/h2/twostep/GridMapQueryExecutor.java  |   8 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   7 +-
 .../query/IgniteSqlQueryDedicatedPoolTest.java  | 110 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 13 files changed, 215 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 512ceee..f35742b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -169,6 +169,9 @@ public class IgniteConfiguration {
     @Deprecated
     public static final int DFLT_SYSTEM_MAX_THREAD_CNT = 
DFLT_PUBLIC_THREAD_CNT;
 
+    /** Default size of query thread pool. */
+    public static final int DFLT_QUERY_THREAD_POOL_SIZE = 
DFLT_PUBLIC_THREAD_CNT;
+
     /** Default keep alive time for system thread pool. */
     @Deprecated
     public static final long DFLT_SYSTEM_KEEP_ALIVE_TIME = 0;
@@ -266,6 +269,9 @@ public class IgniteConfiguration {
     /** P2P pool size. */
     private int p2pPoolSize = DFLT_P2P_THREAD_CNT;
 
+    /** Query pool size. */
+    private int qryPoolSize = DFLT_QUERY_THREAD_POOL_SIZE;
+
     /** Ignite installation folder. */
     private String igniteHome;
 
@@ -549,6 +555,7 @@ public class IgniteConfiguration {
         platformCfg = cfg.getPlatformConfiguration();
         pluginCfgs = cfg.getPluginConfigurations();
         pubPoolSize = cfg.getPublicThreadPoolSize();
+        qryPoolSize = cfg.getQueryThreadPoolSize();
         rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
         segChkFreq = cfg.getSegmentCheckFrequency();
         segPlc = cfg.getSegmentationPolicy();
@@ -869,6 +876,17 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool that is in charge of processing query messages.
+     * <p>
+     * If not provided, executor service will have size {@link 
#DFLT_QUERY_THREAD_POOL_SIZE}.
+     *
+     * @return Thread pool size to be used in grid for query messages.
+     */
+    public int getQueryThreadPoolSize() {
+        return qryPoolSize;
+    }
+
+    /**
      * Sets thread pool size to use within grid.
      *
      * @param poolSize Thread pool size to use within grid.
@@ -975,6 +993,19 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Sets query thread pool size to use within grid.
+     *
+     * @param poolSize Thread pool size to use within grid.
+     * @see IgniteConfiguration#getQueryThreadPoolSize()
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setQueryThreadPoolSize(int poolSize) {
+        qryPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Sets keep alive time of thread pool size that will be used to process 
utility cache messages.
      *
      * @param keepAliveTime Keep alive time of executor service to use for 
utility cache messages.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 177062d..8e3dbe1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -573,6 +573,13 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     @Nullable public ExecutorService getIndexingExecutorService();
 
     /**
+     * Executor service that is in charge of processing query messages.
+     *
+     * @return Thread pool implementation to be used in grid for query 
messages.
+     */
+    public ExecutorService getQueryExecutorService();
+
+    /**
      * Gets exception registry.
      *
      * @return Exception registry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a45be59..d8075d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -343,6 +343,10 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    protected ExecutorService qryExecSvc;
+
+    /** */
+    @GridToStringExclude
     private Map<String, Object> attrs = new HashMap<>();
 
     /** */
@@ -400,6 +404,8 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
      * @param restExecSvc REST executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
+     * @param callbackExecSvc Callback executor service.
+     * @param qryExecSvc Query executor service.
      * @param plugins Plugin providers.
      * @throws IgniteCheckedException In case of error.
      */
@@ -421,6 +427,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
+        ExecutorService qryExecSvc,
         List<PluginProvider> plugins
     ) {
         assert grid != null;
@@ -442,6 +449,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         this.affExecSvc = affExecSvc;
         this.idxExecSvc = idxExecSvc;
         this.callbackExecSvc = callbackExecSvc;
+        this.qryExecSvc = qryExecSvc;
 
         marshCtx = new MarshallerContextImpl(plugins);
 
@@ -1005,6 +1013,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public ExecutorService getQueryExecutorService() {
+        return qryExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteExceptionRegistry exceptionRegistry() {
         return IgniteExceptionRegistry.get();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 63041a3..1f7f0d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -295,6 +295,10 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
     @GridToStringExclude
     private ObjectName restExecSvcMBean;
 
+    /** */
+    @GridToStringExclude
+    private ObjectName qryExecSvcMBean;
+
     /** Kernal start timestamp. */
     private long startTime = U.currentTimeMillis();
 
@@ -674,6 +678,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
      * @param restExecSvc Reset executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
+     * @param callbackExecSvc Callback executor service.
+     * @param qryExecSvc Query executor service.
      * @param errHnd Error handler to use for notification about startup 
problems.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
@@ -692,6 +698,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
+        ExecutorService qryExecSvc,
         GridAbsClosure errHnd
     )
         throws IgniteCheckedException
@@ -801,6 +808,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                 affExecSvc,
                 idxExecSvc,
                 callbackExecSvc,
+                qryExecSvc,
                 plugins
             );
 
@@ -961,7 +969,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             // Register MBeans.
             registerKernalMBean();
             registerLocalNodeMBean();
-            registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, 
mgmtExecSvc, restExecSvc);
+            registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, 
mgmtExecSvc, restExecSvc, qryExecSvc);
 
             // Lifecycle bean notifications.
             notifyLifecycleBeans(AFTER_NODE_START);
@@ -1545,11 +1553,13 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ExecutorService sysExecSvc,
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
-        ExecutorService restExecSvc) throws IgniteCheckedException {
+        ExecutorService restExecSvc,
+        ExecutorService qryExecSvc) throws IgniteCheckedException {
         pubExecSvcMBean = registerExecutorMBean(execSvc, 
"GridExecutionExecutor");
         sysExecSvcMBean = registerExecutorMBean(sysExecSvc, 
"GridSystemExecutor");
         mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, 
"GridManagementExecutor");
         p2PExecSvcMBean = registerExecutorMBean(p2pExecSvc, 
"GridClassLoadingExecutor");
+        qryExecSvcMBean = registerExecutorMBean(qryExecSvc, 
"GridQueryExecutor");
 
         ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
 
@@ -2043,7 +2053,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                     unregisterMBean(p2PExecSvcMBean) &
                     unregisterMBean(kernalMBean) &
                     unregisterMBean(locNodeMBean) &
-                    unregisterMBean(restExecSvcMBean)
+                    unregisterMBean(restExecSvcMBean) &
+                    unregisterMBean(qryExecSvcMBean)
             ))
                 errOnStop = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index c55f954..42ff739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1489,6 +1489,9 @@ public class IgnitionEx {
         /** Continuous query executor service. */
         private IgniteStripedThreadPoolExecutor callbackExecSvc;
 
+        /** Query executor service. */
+        private ThreadPoolExecutor qryExecSvc;
+
         /** Grid state. */
         private volatile IgniteState state = STOPPED;
 
@@ -1783,6 +1786,18 @@ public class IgnitionEx {
                 );
             }
 
+            validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
+
+            qryExecSvc = new IgniteThreadPoolExecutor(
+                "query",
+                cfg.getGridName(),
+                cfg.getQueryThreadPoolSize(),
+                cfg.getQueryThreadPoolSize(),
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>());
+
+            qryExecSvc.allowCoreThreadTimeOut(true);
+
             // Register Ignite MBean for current grid instance.
             registerFactoryMbean(myCfg.getMBeanServer());
 
@@ -1808,6 +1823,7 @@ public class IgnitionEx {
                     affExecSvc,
                     idxExecSvc,
                     callbackExecSvc,
+                    qryExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2402,6 +2418,10 @@ public class IgnitionEx {
 
             sysExecSvc = null;
 
+            U.shutdownNow(getClass(), qryExecSvc, log);
+
+            qryExecSvc = null;
+
             U.shutdownNow(getClass(), stripedExecSvc, log);
 
             stripedExecSvc = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 16ea972..0228684 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -98,6 +98,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
 import static 
org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
 import static 
org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -686,6 +687,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 case IDX_POOL:
                 case IGFS_POOL:
                 case DATA_STREAMER_POOL:
+                case QUERY_POOL:
                 {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
@@ -1206,7 +1208,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     private void invokeListener(Byte plc, GridMessageListener lsnr, UUID 
nodeId, Object msg) {
         Byte oldPlc = CUR_PLC.get();
 
-        boolean change = F.eq(oldPlc, plc);
+        boolean change = !F.eq(oldPlc, plc);
 
         if (change)
             CUR_PLC.set(plc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index cb673d0..a3fb370 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -49,6 +49,9 @@ public class GridIoPolicy {
     /** Data streamer execution pool. */
     public static final byte DATA_STREAMER_POOL = 9;
 
+    /** Query execution pool. */
+    public static final byte QUERY_POOL = 10;
+
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index f42815b..f84b741 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -128,6 +128,11 @@ public class PoolProcessor extends GridProcessorAdapter {
 
                 return ctx.getDataStreamerExecutorService();
 
+            case GridIoPolicy.QUERY_POOL:
+                assert ctx.getQueryExecutorService() != null : "Query pool is 
not configured.";
+
+                return ctx.getQueryExecutorService();
+
             default: {
                 if (plc < 0)
                     throw new IgniteCheckedException("Policy cannot be 
negative: " + plc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index db45e27..40f0e43 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -64,6 +64,7 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 U.allPluginProviders()
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..2802da5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -44,6 +44,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -86,7 +87,6 @@ import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVer
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
-import static 
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
@@ -655,7 +655,7 @@ public class GridMapQueryExecutor {
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             }
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.QUERY_POOL);
         }
         catch (Exception e) {
             e.addSuppressed(err);
@@ -729,7 +729,7 @@ public class GridMapQueryExecutor {
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.QUERY_POOL);
         }
         catch (IgniteCheckedException e) {
             log.error("Failed to send message.", e);
@@ -756,7 +756,7 @@ public class GridMapQueryExecutor {
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.QUERY_POOL);
         }
         catch (Exception e) {
             U.warn(log, "Failed to send retry message: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 1f00ed2..7c036ea 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -104,9 +104,6 @@ import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
  * Reduce query executor.
  */
 public class GridReduceQueryExecutor {
-    /** Thread pool to process query messages. */
-    public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL;
-
     /** */
     private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = 
IgniteProductVersion.fromString("1.7.0");
 
@@ -327,7 +324,7 @@ public class GridReduceQueryExecutor {
                         if (node.isLocal())
                             h2.mapQueryExecutor().onMessage(ctx.localNodeId(), 
msg0);
                         else
-                            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, 
QUERY_POOL);
+                            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, 
GridIoPolicy.QUERY_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         throw new CacheException("Failed to fetch data from 
node: " + node.id(), e);
@@ -1178,7 +1175,7 @@ public class GridReduceQueryExecutor {
             msg,
             specialize,
             locNodeHnd,
-            QUERY_POOL,
+            GridIoPolicy.QUERY_POOL,
             runLocParallel);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
new file mode 100644
index 0000000..bba3642
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import java.util.List;
+
+/**
+ * Ensures that SQL queries are executed in a dedicated thread pool.
+ */
+public class IgniteSqlQueryDedicatedPoolTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Name of the cache for test */
+    private static final String CACHE_NAME = "query_pool_test";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGrid("server");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setIndexedTypes(Integer.class, Integer.class);
+        ccfg.setSqlFunctionClasses(IgniteSqlQueryDedicatedPoolTest.class);
+        ccfg.setName(CACHE_NAME);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        if ("client".equals(gridName))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that SQL queries are executed in dedicated pool
+     */
+    public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
+        try (Ignite client = startGrid("client")) {
+            IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
+
+            QueryCursor<List<?>> cursor = cache.query(new 
SqlFieldsQuery("select currentPolicy()"));
+
+            List<List<?>> result = cursor.getAll();
+
+            cursor.close();
+
+            assertEquals(1, result.size());
+
+            Byte plc = (Byte)result.get(0).get(0);
+
+            assert plc != null;
+            assert plc == GridIoPolicy.QUERY_POOL;
+        }
+    }
+
+    /**
+     * Custom SQL function to return current thread name from inside query 
executor
+     */
+    @SuppressWarnings("unused")
+    @QuerySqlFunction(alias = "currentPolicy")
+    public static Byte currentPolicy() {
+         return GridIoManager.currentPolicy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index b5e4078..49fb269 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -95,6 +95,7 @@ import 
org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQueryS
 import 
org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDestroySelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
+import 
org.apache.ignite.internal.processors.query.IgniteSqlQueryDedicatedPoolTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
@@ -240,6 +241,7 @@ public class IgniteCacheQuerySelfTestSuite extends 
TestSuite {
         suite.addTestSuite(CacheOffheapBatchIndexingSingleTypeTest.class);
         suite.addTestSuite(CacheSqlQueryValueCopySelfTest.class);
         suite.addTestSuite(IgniteCacheQueryCacheDestroySelfTest.class);
+        suite.addTestSuite(IgniteSqlQueryDedicatedPoolTest.class);
 
         return suite;
     }

Reply via email to