Repository: ignite Updated Branches: refs/heads/ignite-2.0 739c606aa -> aa46bc7c8
IGNITE-4105: Added separate thread pool for queries. This closes #1469. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa46bc7c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa46bc7c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa46bc7c Branch: refs/heads/ignite-2.0 Commit: aa46bc7c82e7f172bcceed4afa4b010bf4071cda Parents: 739c606 Author: Sergey Kalashnikov <skalashni...@gridgain.com> Authored: Fri Feb 3 12:09:27 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Feb 3 12:09:27 2017 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 31 ++++++ .../ignite/internal/GridKernalContext.java | 7 ++ .../ignite/internal/GridKernalContextImpl.java | 13 +++ .../apache/ignite/internal/IgniteKernal.java | 17 ++- .../org/apache/ignite/internal/IgnitionEx.java | 20 ++++ .../managers/communication/GridIoManager.java | 4 +- .../managers/communication/GridIoPolicy.java | 3 + .../internal/processors/pool/PoolProcessor.java | 5 + .../junits/GridTestKernalContext.java | 1 + .../query/h2/twostep/GridMapQueryExecutor.java | 8 +- .../h2/twostep/GridReduceQueryExecutor.java | 7 +- .../query/IgniteSqlQueryDedicatedPoolTest.java | 110 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 13 files changed, 215 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 512ceee..f35742b 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -169,6 +169,9 @@ public class IgniteConfiguration { @Deprecated public static final int DFLT_SYSTEM_MAX_THREAD_CNT = DFLT_PUBLIC_THREAD_CNT; + /** Default size of query thread pool. */ + public static final int DFLT_QUERY_THREAD_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT; + /** Default keep alive time for system thread pool. */ @Deprecated public static final long DFLT_SYSTEM_KEEP_ALIVE_TIME = 0; @@ -266,6 +269,9 @@ public class IgniteConfiguration { /** P2P pool size. */ private int p2pPoolSize = DFLT_P2P_THREAD_CNT; + /** Query pool size. */ + private int qryPoolSize = DFLT_QUERY_THREAD_POOL_SIZE; + /** Ignite installation folder. */ private String igniteHome; @@ -549,6 +555,7 @@ public class IgniteConfiguration { platformCfg = cfg.getPlatformConfiguration(); pluginCfgs = cfg.getPluginConfigurations(); pubPoolSize = cfg.getPublicThreadPoolSize(); + qryPoolSize = cfg.getQueryThreadPoolSize(); rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize(); segChkFreq = cfg.getSegmentCheckFrequency(); segPlc = cfg.getSegmentationPolicy(); @@ -869,6 +876,17 @@ public class IgniteConfiguration { } /** + * Size of thread pool that is in charge of processing query messages. + * <p> + * If not provided, executor service will have size {@link #DFLT_QUERY_THREAD_POOL_SIZE}. + * + * @return Thread pool size to be used in grid for query messages. + */ + public int getQueryThreadPoolSize() { + return qryPoolSize; + } + + /** * Sets thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. @@ -975,6 +993,19 @@ public class IgniteConfiguration { } /** + * Sets query thread pool size to use within grid. + * + * @param poolSize Thread pool size to use within grid. + * @see IgniteConfiguration#getQueryThreadPoolSize() + * @return {@code this} for chaining. + */ + public IgniteConfiguration setQueryThreadPoolSize(int poolSize) { + qryPoolSize = poolSize; + + return this; + } + + /** * Sets keep alive time of thread pool size that will be used to process utility cache messages. * * @param keepAliveTime Keep alive time of executor service to use for utility cache messages. http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 177062d..8e3dbe1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -573,6 +573,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { @Nullable public ExecutorService getIndexingExecutorService(); /** + * Executor service that is in charge of processing query messages. + * + * @return Thread pool implementation to be used in grid for query messages. + */ + public ExecutorService getQueryExecutorService(); + + /** * Gets exception registry. * * @return Exception registry. http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index a45be59..d8075d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -343,6 +343,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected ExecutorService qryExecSvc; + + /** */ + @GridToStringExclude private Map<String, Object> attrs = new HashMap<>(); /** */ @@ -400,6 +404,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param restExecSvc REST executor service. * @param affExecSvc Affinity executor service. * @param idxExecSvc Indexing executor service. + * @param callbackExecSvc Callback executor service. + * @param qryExecSvc Query executor service. * @param plugins Plugin providers. * @throws IgniteCheckedException In case of error. */ @@ -421,6 +427,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, + ExecutorService qryExecSvc, List<PluginProvider> plugins ) { assert grid != null; @@ -442,6 +449,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.affExecSvc = affExecSvc; this.idxExecSvc = idxExecSvc; this.callbackExecSvc = callbackExecSvc; + this.qryExecSvc = qryExecSvc; marshCtx = new MarshallerContextImpl(plugins); @@ -1005,6 +1013,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService getQueryExecutorService() { + return qryExecSvc; + } + + /** {@inheritDoc} */ @Override public IgniteExceptionRegistry exceptionRegistry() { return IgniteExceptionRegistry.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 63041a3..1f7f0d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -295,6 +295,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private ObjectName restExecSvcMBean; + /** */ + @GridToStringExclude + private ObjectName qryExecSvcMBean; + /** Kernal start timestamp. */ private long startTime = U.currentTimeMillis(); @@ -674,6 +678,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param restExecSvc Reset executor service. * @param affExecSvc Affinity executor service. * @param idxExecSvc Indexing executor service. + * @param callbackExecSvc Callback executor service. + * @param qryExecSvc Query executor service. * @param errHnd Error handler to use for notification about startup problems. * @throws IgniteCheckedException Thrown in case of any errors. */ @@ -692,6 +698,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, + ExecutorService qryExecSvc, GridAbsClosure errHnd ) throws IgniteCheckedException @@ -801,6 +808,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { affExecSvc, idxExecSvc, callbackExecSvc, + qryExecSvc, plugins ); @@ -961,7 +969,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Register MBeans. registerKernalMBean(); registerLocalNodeMBean(); - registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc); + registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc, qryExecSvc); // Lifecycle bean notifications. notifyLifecycleBeans(AFTER_NODE_START); @@ -1545,11 +1553,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService sysExecSvc, ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, - ExecutorService restExecSvc) throws IgniteCheckedException { + ExecutorService restExecSvc, + ExecutorService qryExecSvc) throws IgniteCheckedException { pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor"); sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor"); mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor"); p2PExecSvcMBean = registerExecutorMBean(p2pExecSvc, "GridClassLoadingExecutor"); + qryExecSvcMBean = registerExecutorMBean(qryExecSvc, "GridQueryExecutor"); ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration(); @@ -2043,7 +2053,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { unregisterMBean(p2PExecSvcMBean) & unregisterMBean(kernalMBean) & unregisterMBean(locNodeMBean) & - unregisterMBean(restExecSvcMBean) + unregisterMBean(restExecSvcMBean) & + unregisterMBean(qryExecSvcMBean) )) errOnStop = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index c55f954..42ff739 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1489,6 +1489,9 @@ public class IgnitionEx { /** Continuous query executor service. */ private IgniteStripedThreadPoolExecutor callbackExecSvc; + /** Query executor service. */ + private ThreadPoolExecutor qryExecSvc; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1783,6 +1786,18 @@ public class IgnitionEx { ); } + validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query"); + + qryExecSvc = new IgniteThreadPoolExecutor( + "query", + cfg.getGridName(), + cfg.getQueryThreadPoolSize(), + cfg.getQueryThreadPoolSize(), + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>()); + + qryExecSvc.allowCoreThreadTimeOut(true); + // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1808,6 +1823,7 @@ public class IgnitionEx { affExecSvc, idxExecSvc, callbackExecSvc, + qryExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2402,6 +2418,10 @@ public class IgnitionEx { sysExecSvc = null; + U.shutdownNow(getClass(), qryExecSvc, log); + + qryExecSvc = null; + U.shutdownNow(getClass(), stripedExecSvc, log); stripedExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 16ea972..0228684 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -98,6 +98,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy; import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV; @@ -686,6 +687,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case IDX_POOL: case IGFS_POOL: case DATA_STREAMER_POOL: + case QUERY_POOL: { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); @@ -1206,7 +1208,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) { Byte oldPlc = CUR_PLC.get(); - boolean change = F.eq(oldPlc, plc); + boolean change = !F.eq(oldPlc, plc); if (change) CUR_PLC.set(plc); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index cb673d0..a3fb370 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -49,6 +49,9 @@ public class GridIoPolicy { /** Data streamer execution pool. */ public static final byte DATA_STREAMER_POOL = 9; + /** Query execution pool. */ + public static final byte QUERY_POOL = 10; + /** * Defines the range of reserved pools that are not available for plugins. * @param key The key. http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index f42815b..f84b741 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -128,6 +128,11 @@ public class PoolProcessor extends GridProcessorAdapter { return ctx.getDataStreamerExecutorService(); + case GridIoPolicy.QUERY_POOL: + assert ctx.getQueryExecutorService() != null : "Query pool is not configured."; + + return ctx.getQueryExecutorService(); + default: { if (plc < 0) throw new IgniteCheckedException("Policy cannot be negative: " + plc); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index db45e27..40f0e43 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -64,6 +64,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, U.allPluginProviders() ); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ac1a6a6..2802da5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -44,6 +44,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -86,7 +87,6 @@ import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVer import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; -import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; @@ -655,7 +655,7 @@ public class GridMapQueryExecutor { h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); } else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL); } catch (Exception e) { e.addSuppressed(err); @@ -729,7 +729,7 @@ public class GridMapQueryExecutor { if (loc) h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL); } catch (IgniteCheckedException e) { log.error("Failed to send message.", e); @@ -756,7 +756,7 @@ public class GridMapQueryExecutor { if (loc) h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL); } catch (Exception e) { U.warn(log, "Failed to send retry message: " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 1f00ed2..7c036ea 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -104,9 +104,6 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType * Reduce query executor. */ public class GridReduceQueryExecutor { - /** Thread pool to process query messages. */ - public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL; - /** */ private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0"); @@ -327,7 +324,7 @@ public class GridReduceQueryExecutor { if (node.isLocal()) h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, QUERY_POOL); + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL); } catch (IgniteCheckedException e) { throw new CacheException("Failed to fetch data from node: " + node.id(), e); @@ -1178,7 +1175,7 @@ public class GridReduceQueryExecutor { msg, specialize, locNodeHnd, - QUERY_POOL, + GridIoPolicy.QUERY_POOL, runLocParallel); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java new file mode 100644 index 0000000..bba3642 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import java.util.List; + +/** + * Ensures that SQL queries are executed in a dedicated thread pool. + */ +public class IgniteSqlQueryDedicatedPoolTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Name of the cache for test */ + private static final String CACHE_NAME = "query_pool_test"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGrid("server"); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setIndexedTypes(Integer.class, Integer.class); + ccfg.setSqlFunctionClasses(IgniteSqlQueryDedicatedPoolTest.class); + ccfg.setName(CACHE_NAME); + + cfg.setCacheConfiguration(ccfg); + + if ("client".equals(gridName)) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Test that SQL queries are executed in dedicated pool + */ + public void testSqlQueryUsesDedicatedThreadPool() throws Exception { + try (Ignite client = startGrid("client")) { + IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME); + + QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy()")); + + List<List<?>> result = cursor.getAll(); + + cursor.close(); + + assertEquals(1, result.size()); + + Byte plc = (Byte)result.get(0).get(0); + + assert plc != null; + assert plc == GridIoPolicy.QUERY_POOL; + } + } + + /** + * Custom SQL function to return current thread name from inside query executor + */ + @SuppressWarnings("unused") + @QuerySqlFunction(alias = "currentPolicy") + public static Byte currentPolicy() { + return GridIoManager.currentPolicy(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index b5e4078..49fb269 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -95,6 +95,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQueryS import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDestroySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest; +import org.apache.ignite.internal.processors.query.IgniteSqlQueryDedicatedPoolTest; import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest; @@ -240,6 +241,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(CacheOffheapBatchIndexingSingleTypeTest.class); suite.addTestSuite(CacheSqlQueryValueCopySelfTest.class); suite.addTestSuite(IgniteCacheQueryCacheDestroySelfTest.class); + suite.addTestSuite(IgniteSqlQueryDedicatedPoolTest.class); return suite; }