IGNITE-2788: Basic Redis protocol implementation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/90fe51e3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/90fe51e3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/90fe51e3 Branch: refs/heads/ignite-2788 Commit: 90fe51e356d8d7ac1fb9c11778e339a8cc41c400 Parents: 732abda Author: shtykh_roman <[email protected]> Authored: Mon Apr 11 10:41:22 2016 +0900 Committer: shtykh_roman <[email protected]> Committed: Mon Apr 11 10:41:22 2016 +0900 ---------------------------------------------------------------------- modules/clients/pom.xml | 16 +- .../processors/redis/RedisProtocolSelfTest.java | 160 ++++++++++++++ .../configuration/IgniteConfiguration.java | 145 +++++++------ .../configuration/RedisConfiguration.java | 96 ++++++++ .../ignite/internal/GridKernalContext.java | 26 ++- .../ignite/internal/GridKernalContextImpl.java | 24 ++ .../apache/ignite/internal/IgniteKernal.java | 4 + .../org/apache/ignite/internal/IgnitionEx.java | 22 +- .../processors/redis/GridRedisCommand.java | 46 ++++ .../processors/redis/GridRedisMessage.java | 77 +++++++ .../processors/redis/GridRedisNioListener.java | 70 ++++++ .../processors/redis/GridRedisProcessor.java | 89 ++++++++ .../redis/GridRedisProtocolHandler.java | 217 +++++++++++++++++++ .../redis/GridRedisProtocolParser.java | 149 +++++++++++++ .../processors/redis/GridRedisServer.java | 206 ++++++++++++++++++ .../redis/handler/GridRedisCommandHandler.java | 40 ++++ .../GridRedisConnectionCommandHandler.java | 74 +++++++ .../junits/GridTestKernalContext.java | 1 + 18 files changed, 1390 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/clients/pom.xml ---------------------------------------------------------------------- diff --git a/modules/clients/pom.xml b/modules/clients/pom.xml index 73a2161..729bc89 100644 --- a/modules/clients/pom.xml +++ b/modules/clients/pom.xml @@ -20,7 +20,8 @@ <!-- POM file. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -55,6 +56,12 @@ </dependency> <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <version>2.8.1</version> + </dependency> + + <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring</artifactId> <version>${project.version}</version> @@ -168,10 +175,11 @@ <configuration> <target> <exec executable="${doxygen.exec}" searchpath="true" dir="../platforms/dotnet/"> - <arg value="Apache.Ignite.dxg" /> + <arg value="Apache.Ignite.dxg"/> </exec> - <copy file="../../assembly/docfiles/ignite_logo.png" todir="target/dotnetdoc/html" /> + <copy file="../../assembly/docfiles/ignite_logo.png" + todir="target/dotnetdoc/html"/> </target> </configuration> </execution> @@ -185,7 +193,7 @@ <configuration> <target> <exec executable="${doxygen.exec}" searchpath="true" dir="../platforms/cpp/"> - <arg value="cpp.dxg" /> + <arg value="cpp.dxg"/> </exec> </target> </configuration> http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java new file mode 100644 index 0000000..57e3bb3 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.RedisConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +/** + * Tests for Redis protocol. + */ +public class RedisProtocolSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Local host. */ + private static final String HOST = "127.0.0.1"; + + /** Port. */ + private static final int PORT = 6379; + + /** Pool. */ + private static JedisPool pool; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(gridCount()); + + JedisPoolConfig jedisPoolCfg = new JedisPoolConfig(); + + jedisPoolCfg.setMaxWaitMillis(10000); + jedisPoolCfg.setMaxIdle(100); + jedisPoolCfg.setMinIdle(1); + jedisPoolCfg.setNumTestsPerEvictionRun(10); + jedisPoolCfg.setTestOnBorrow(true); + jedisPoolCfg.setTestOnReturn(true); + jedisPoolCfg.setTestWhileIdle(true); + jedisPoolCfg.setTimeBetweenEvictionRunsMillis(30000); + + pool = new JedisPool(jedisPoolCfg, HOST, PORT, 10000); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + pool.destroy(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost(HOST); + + assert cfg.getConnectorConfiguration() == null; + + RedisConfiguration redisCfg = new RedisConfiguration(); + + cfg.setRedisConfiguration(redisCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setStatisticsEnabled(true); + ccfg.setIndexedTypes(String.class, String.class); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @return Cache. + */ + @Override protected <K, V> IgniteCache<K, V> jcache() { + return grid(0).cache(null); + } + + /** Grid count. */ + private static final int GRID_CNT = 1; + + /** {@inheritDoc} */ + protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + protected void beforeTest() throws Exception { + assert grid(0).cluster().nodes().size() == gridCount(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + jcache().clear(); + + assertTrue(jcache().localSize() == 0); + } + + /** + * @throws Exception If failed. + */ + public void testPing() throws Exception { + try (Jedis jedis = pool.getResource()) { + Assert.assertEquals("PONG", jedis.ping()); + } + } + + /** + * @throws Exception If failed. + */ + public void testEcho() throws Exception { + try (Jedis jedis = pool.getResource()) { + Assert.assertEquals("Hello, grid!", jedis.echo("Hello, grid!")); + } + } + + /** + * @throws Exception If failed. + */ + public void testGet() throws Exception { + try (Jedis jedis = pool.getResource()) { + jcache().put("getKey1", "getVal1"); + jcache().put("getKey2", "getVal2"); + + Assert.assertEquals("getVal1", jedis.get("getKey1")); + Assert.assertEquals("getVal2", jedis.get("getKey2")); + Assert.assertNull(jedis.get("wrongKey")); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 758a2b4..9378454 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -411,6 +411,9 @@ public class IgniteConfiguration { /** Client access configuration. */ private ConnectorConfiguration connectorCfg = new ConnectorConfiguration(); + /** Redis protocol configuration. */ + private RedisConfiguration redisCfg; + /** Warmup closure. Will be invoked before actual grid start. */ private IgniteInClosure<IgniteConfiguration> warmupClos; @@ -518,6 +521,7 @@ public class IgniteConfiguration { pluginCfgs = cfg.getPluginConfigurations(); pubPoolSize = cfg.getPublicThreadPoolSize(); rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize(); + redisCfg = cfg.getRedisConfiguration(); segChkFreq = cfg.getSegmentCheckFrequency(); segPlc = cfg.getSegmentationPolicy(); segResolveAttempts = cfg.getSegmentationResolveAttempts(); @@ -543,7 +547,7 @@ public class IgniteConfiguration { * provided. * * @return Optional grid name. Can be {@code null}, which is default grid name, if - * non-default grid name was not provided. + * non-default grid name was not provided. */ public String getGridName() { return gridName; @@ -593,7 +597,7 @@ public class IgniteConfiguration { * Sets grid name. Note that {@code null} is a default grid name. * * @param gridName Grid name to set. Can be {@code null}, which is default - * grid name. + * grid name. * @return {@code this} for chaining. */ public IgniteConfiguration setGridName(String gridName) { @@ -651,8 +655,8 @@ public class IgniteConfiguration { * Sets user attributes for this node. * * @param userAttrs User attributes for this node. - * @see IgniteConfiguration#getUserAttributes() * @return {@code this} for chaining. + * @see IgniteConfiguration#getUserAttributes() */ public IgniteConfiguration setUserAttributes(Map<String, ?> userAttrs) { this.userAttrs = userAttrs; @@ -675,8 +679,8 @@ public class IgniteConfiguration { * Sets logger to use within grid. * * @param log Logger to use within grid. - * @see IgniteConfiguration#getGridLogger() * @return {@code this} for chaining. + * @see IgniteConfiguration#getGridLogger() */ public IgniteConfiguration setGridLogger(IgniteLogger log) { this.log = log; @@ -692,7 +696,7 @@ public class IgniteConfiguration { * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}. * * @return Thread pool size to be used in grid to process job execution - * requests and user messages sent to the node. + * requests and user messages sent to the node. */ public int getPublicThreadPoolSize() { return pubPoolSize; @@ -716,7 +720,7 @@ public class IgniteConfiguration { * If not provided, executor service will have size {@link #DFLT_MGMT_THREAD_CNT} * * @return Thread pool size to be used in grid for internal and Visor - * jobs processing. + * jobs processing. */ public int getManagementThreadPoolSize() { return mgmtPoolSize; @@ -730,7 +734,7 @@ public class IgniteConfiguration { * If not provided, executor service will have size {@link #DFLT_P2P_THREAD_CNT}. * * @return Thread pool size to be used for peer class loading - * requests handling. + * requests handling. */ public int getPeerClassLoadingThreadPoolSize() { return p2pPoolSize; @@ -795,8 +799,8 @@ public class IgniteConfiguration { * Sets thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. - * @see IgniteConfiguration#getPublicThreadPoolSize() * @return {@code this} for chaining. + * @see IgniteConfiguration#getPublicThreadPoolSize() */ public IgniteConfiguration setPublicThreadPoolSize(int poolSize) { pubPoolSize = poolSize; @@ -808,8 +812,8 @@ public class IgniteConfiguration { * Sets system thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. - * @see IgniteConfiguration#getSystemThreadPoolSize() * @return {@code this} for chaining. + * @see IgniteConfiguration#getSystemThreadPoolSize() */ public IgniteConfiguration setSystemThreadPoolSize(int poolSize) { sysPoolSize = poolSize; @@ -821,8 +825,8 @@ public class IgniteConfiguration { * Sets management thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. - * @see IgniteConfiguration#getManagementThreadPoolSize() * @return {@code this} for chaining. + * @see IgniteConfiguration#getManagementThreadPoolSize() */ public IgniteConfiguration setManagementThreadPoolSize(int poolSize) { mgmtPoolSize = poolSize; @@ -834,8 +838,8 @@ public class IgniteConfiguration { * Sets thread pool size to use for peer class loading. * * @param poolSize Thread pool size to use within grid. - * @see IgniteConfiguration#getPeerClassLoadingThreadPoolSize() * @return {@code this} for chaining. + * @see IgniteConfiguration#getPeerClassLoadingThreadPoolSize() */ public IgniteConfiguration setPeerClassLoadingThreadPoolSize(int poolSize) { p2pPoolSize = poolSize; @@ -847,8 +851,8 @@ public class IgniteConfiguration { * Set thread pool size that will be used to process outgoing IGFS messages. * * @param poolSize Executor service to use for outgoing IGFS messages. - * @see IgniteConfiguration#getIgfsThreadPoolSize() * @return {@code this} for chaining. + * @see IgniteConfiguration#getIgfsThreadPoolSize() */ public IgniteConfiguration setIgfsThreadPoolSize(int poolSize) { igfsPoolSize = poolSize; @@ -860,9 +864,9 @@ public class IgniteConfiguration { * Sets default thread pool size that will be used to process utility cache messages. * * @param poolSize Default executor service size to use for utility cache messages. + * @return {@code this} for chaining. * @see IgniteConfiguration#getUtilityCacheThreadPoolSize() * @see IgniteConfiguration#getUtilityCacheKeepAliveTime() - * @return {@code this} for chaining. */ public IgniteConfiguration setUtilityCachePoolSize(int poolSize) { utilityCachePoolSize = poolSize; @@ -874,9 +878,9 @@ public class IgniteConfiguration { * Sets keep alive time of thread pool size that will be used to process utility cache messages. * * @param keepAliveTime Keep alive time of executor service to use for utility cache messages. + * @return {@code this} for chaining. * @see IgniteConfiguration#getUtilityCacheThreadPoolSize() * @see IgniteConfiguration#getUtilityCacheKeepAliveTime() - * @return {@code this} for chaining. */ public IgniteConfiguration setUtilityCacheKeepAliveTime(long keepAliveTime) { utilityCacheKeepAliveTime = keepAliveTime; @@ -888,9 +892,9 @@ public class IgniteConfiguration { * Sets default thread pool size that will be used to process marshaller messages. * * @param poolSize Default executor service size to use for marshaller messages. + * @return {@code this} for chaining. * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize() * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime() - * @return {@code this} for chaining. * @deprecated Use {@link #setMarshallerCacheThreadPoolSize(int)} instead. */ @Deprecated @@ -902,9 +906,9 @@ public class IgniteConfiguration { * Sets default thread pool size that will be used to process marshaller messages. * * @param poolSize Default executor service size to use for marshaller messages. + * @return {@code this} for chaining. * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize() * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime() - * @return {@code this} for chaining. */ public IgniteConfiguration setMarshallerCacheThreadPoolSize(int poolSize) { marshCachePoolSize = poolSize; @@ -916,9 +920,9 @@ public class IgniteConfiguration { * Sets maximum thread pool size that will be used to process marshaller messages. * * @param keepAliveTime Keep alive time of executor service to use for marshaller messages. + * @return {@code this} for chaining. * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize() * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime() - * @return {@code this} for chaining. */ public IgniteConfiguration setMarshallerCacheKeepAliveTime(long keepAliveTime) { marshCacheKeepAliveTime = keepAliveTime; @@ -933,7 +937,7 @@ public class IgniteConfiguration { * will be thrown. * * @return Ignite installation home or {@code null} to make the system attempt to - * infer it automatically. + * infer it automatically. * @see IgniteSystemProperties#IGNITE_HOME */ public String getIgniteHome() { @@ -944,9 +948,9 @@ public class IgniteConfiguration { * Sets Ignite installation folder. * * @param ggHome {@code Ignition} installation folder. + * @return {@code this} for chaining. * @see IgniteConfiguration#getIgniteHome() * @see IgniteSystemProperties#IGNITE_HOME - * @return {@code this} for chaining. */ public IgniteConfiguration setIgniteHome(String ggHome) { this.ggHome = ggHome; @@ -973,8 +977,8 @@ public class IgniteConfiguration { * Sets Ignite work folder. * * @param ggWork {@code Ignite} work folder. - * @see IgniteConfiguration#getWorkDirectory() * @return {@code this} for chaining. + * @see IgniteConfiguration#getWorkDirectory() */ public IgniteConfiguration setWorkDirectory(String ggWork) { this.ggWork = ggWork; @@ -1019,8 +1023,8 @@ public class IgniteConfiguration { * Sets unique identifier for local node. * * @param nodeId Unique identifier for local node. - * @see IgniteConfiguration#getNodeId() * @return {@code this} for chaining. + * @see IgniteConfiguration#getNodeId() * @deprecated Use {@link #setConsistentId(Serializable)} instead. */ @Deprecated @@ -1045,8 +1049,8 @@ public class IgniteConfiguration { * Sets marshaller to use within grid. * * @param marsh Marshaller to use within grid. - * @see IgniteConfiguration#getMarshaller() * @return {@code this} for chaining. + * @see IgniteConfiguration#getMarshaller() */ public IgniteConfiguration setMarshaller(Marshaller marsh) { this.marsh = marsh; @@ -1054,7 +1058,6 @@ public class IgniteConfiguration { return this; } - /** * Returns {@code true} if peer class loading is enabled, {@code false} * otherwise. Default value is {@code false} specified by {@link #DFLT_P2P_ENABLED}. @@ -1067,7 +1070,7 @@ public class IgniteConfiguration { * See {@link ComputeTask} documentation for more information about task deployment. * * @return {@code true} if peer class loading is enabled, {@code false} - * otherwise. + * otherwise. */ public boolean isPeerClassLoadingEnabled() { return p2pEnabled; @@ -1101,7 +1104,7 @@ public class IgniteConfiguration { * Enables/disables peer class loading. * * @param p2pEnabled {@code true} if peer class loading is - * enabled, {@code false} otherwise. + * enabled, {@code false} otherwise. * @return {@code this} for chaining. */ public IgniteConfiguration setPeerClassLoadingEnabled(boolean p2pEnabled) { @@ -1128,7 +1131,7 @@ public class IgniteConfiguration { * loaded even if they exist locally. * * @param p2pLocClsPathExcl List of P2P loaded packages. Package - * name supports '*' at the end like in package import clause. + * name supports '*' at the end like in package import clause. * @return {@code this} for chaining. */ public IgniteConfiguration setPeerClassLoadingLocalClassPathExclude(String... p2pLocClsPathExcl) { @@ -1154,9 +1157,9 @@ public class IgniteConfiguration { * If not explicitly set, then default value is {@code 10,000}. * * @param metricsHistSize Number of metrics kept in history to use for - * metric totals and averages calculations. - * @see #DFLT_METRICS_HISTORY_SIZE + * metric totals and averages calculations. * @return {@code this} for chaining. + * @see #DFLT_METRICS_HISTORY_SIZE */ public IgniteConfiguration setMetricsHistorySize(int metricsHistSize) { this.metricsHistSize = metricsHistSize; @@ -1171,10 +1174,10 @@ public class IgniteConfiguration { * <p> * The following values are accepted: * <ul> - * <li>{@code -1} job metrics are never updated.</li> - * <li>{@code 0} job metrics are updated on each job start and finish.</li> - * <li>Positive value defines the actual update frequency. If not provided, then default value - * {@link #DFLT_METRICS_UPDATE_FREQ} is used.</li> + * <li>{@code -1} job metrics are never updated.</li> + * <li>{@code 0} job metrics are updated on each job start and finish.</li> + * <li>Positive value defines the actual update frequency. If not provided, then default value + * {@link #DFLT_METRICS_UPDATE_FREQ} is used.</li> * </ul> * If not provided, then default value {@link #DFLT_METRICS_UPDATE_FREQ} is used. * @@ -1220,8 +1223,8 @@ public class IgniteConfiguration { * If not set explicitly, then default value is {@code 600,000} milliseconds (10 minutes). * * @param metricsExpTime The metricsExpTime to set. - * @see #DFLT_METRICS_EXPIRE_TIME * @return {@code this} for chaining. + * @see #DFLT_METRICS_EXPIRE_TIME */ public IgniteConfiguration setMetricsExpireTime(long metricsExpTime) { this.metricsExpTime = metricsExpTime; @@ -1249,8 +1252,8 @@ public class IgniteConfiguration { * {@link #DFLT_NETWORK_TIMEOUT} is used. * * @param netTimeout Maximum timeout for network requests. - * @see #DFLT_NETWORK_TIMEOUT * @return {@code this} for chaining. + * @see #DFLT_NETWORK_TIMEOUT */ public IgniteConfiguration setNetworkTimeout(long netTimeout) { this.netTimeout = netTimeout; @@ -1365,6 +1368,7 @@ public class IgniteConfiguration { /** * Gets Max count of threads can be used at rebalancing. * Minimum is 1. + * * @return count. */ public int getRebalanceThreadPoolSize() { @@ -1377,7 +1381,7 @@ public class IgniteConfiguration { * Default is {@code 1} which has minimal impact on the operation of the grid. * * @param rebalanceThreadPoolSize Number of system threads that will be assigned for partition transfer during - * rebalancing. + * rebalancing. * @return {@code this} for chaining. */ public IgniteConfiguration setRebalanceThreadPoolSize(int rebalanceThreadPoolSize) { @@ -1407,8 +1411,8 @@ public class IgniteConfiguration { * routines. * * @param lifecycleBeans Collection of lifecycle beans. - * @see LifecycleEventType * @return {@code this} for chaining. + * @see LifecycleEventType */ public IgniteConfiguration setLifecycleBeans(LifecycleBean... lifecycleBeans) { this.lifecycleBeans = lifecycleBeans; @@ -1452,8 +1456,8 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link EventStorageSpi}. * * @param evtSpi Fully configured instance of {@link EventStorageSpi}. - * @see IgniteConfiguration#getEventStorageSpi() * @return {@code this} for chaining. + * @see IgniteConfiguration#getEventStorageSpi() */ public IgniteConfiguration setEventStorageSpi(EventStorageSpi evtSpi) { this.evtSpi = evtSpi; @@ -1475,8 +1479,8 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link DiscoverySpi}. * * @param discoSpi Fully configured instance of {@link DiscoverySpi}. - * @see IgniteConfiguration#getDiscoverySpi() * @return {@code this} for chaining. + * @see IgniteConfiguration#getDiscoverySpi() */ public IgniteConfiguration setDiscoverySpi(DiscoverySpi discoSpi) { this.discoSpi = discoSpi; @@ -1543,7 +1547,7 @@ public class IgniteConfiguration { * Default is {@link #DFLT_ALL_SEG_RESOLVERS_PASS_REQ}. * * @return {@code True} if all segmentation resolvers should succeed, - * {@code false} if only one is enough. + * {@code false} if only one is enough. */ public boolean isAllSegmentationResolversPassRequired() { return allResolversPassReq; @@ -1553,7 +1557,7 @@ public class IgniteConfiguration { * Sets all segmentation resolvers pass required flag. * * @param allResolversPassReq {@code True} if all segmentation resolvers should - * succeed for node to be in the correct segment. + * succeed for node to be in the correct segment. * @return {@code this} for chaining. */ public IgniteConfiguration setAllSegmentationResolversPassRequired(boolean allResolversPassReq) { @@ -1650,8 +1654,8 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link CommunicationSpi}. * * @param commSpi Fully configured instance of {@link CommunicationSpi}. - * @see IgniteConfiguration#getCommunicationSpi() * @return {@code this} for chaining. + * @see IgniteConfiguration#getCommunicationSpi() */ public IgniteConfiguration setCommunicationSpi(CommunicationSpi commSpi) { this.commSpi = commSpi; @@ -1677,9 +1681,9 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link CollisionSpi}. * * @param colSpi Fully configured instance of {@link CollisionSpi} or - * {@code null} if no SPI provided. - * @see IgniteConfiguration#getCollisionSpi() + * {@code null} if no SPI provided. * @return {@code this} for chaining. + * @see IgniteConfiguration#getCollisionSpi() */ public IgniteConfiguration setCollisionSpi(CollisionSpi colSpi) { this.colSpi = colSpi; @@ -1701,8 +1705,8 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link DeploymentSpi}. * * @param deploySpi Fully configured instance of {@link DeploymentSpi}. - * @see IgniteConfiguration#getDeploymentSpi() * @return {@code this} for chaining. + * @see IgniteConfiguration#getDeploymentSpi() */ public IgniteConfiguration setDeploymentSpi(DeploymentSpi deploySpi) { this.deploySpi = deploySpi; @@ -1724,8 +1728,8 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link CheckpointSpi}. * * @param cpSpi Fully configured instance of {@link CheckpointSpi}. - * @see IgniteConfiguration#getCheckpointSpi() * @return {@code this} for chaining. + * @see IgniteConfiguration#getCheckpointSpi() */ public IgniteConfiguration setCheckpointSpi(CheckpointSpi... cpSpi) { this.cpSpi = cpSpi; @@ -1747,9 +1751,9 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link FailoverSpi}. * * @param failSpi Fully configured instance of {@link FailoverSpi} or - * {@code null} if no SPI provided. - * @see IgniteConfiguration#getFailoverSpi() + * {@code null} if no SPI provided. * @return {@code this} for chaining. + * @see IgniteConfiguration#getFailoverSpi() */ public IgniteConfiguration setFailoverSpi(FailoverSpi... failSpi) { this.failSpi = failSpi; @@ -1762,8 +1766,8 @@ public class IgniteConfiguration { * <p> * Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}. * - * @see #setFailureDetectionTimeout(long) * @return Failure detection timeout in milliseconds. + * @see #setFailureDetectionTimeout(long) */ public Long getFailureDetectionTimeout() { return failureDetectionTimeout; @@ -1817,7 +1821,7 @@ public class IgniteConfiguration { * If not set explicitly, then default value is {@code 600,000} milliseconds (10 minutes). * * @param discoStartupDelay Time in milliseconds for when nodes - * can be out-of-sync during startup. + * can be out-of-sync during startup. * @return {@code this} for chaining. */ public IgniteConfiguration setDiscoveryStartupDelay(long discoStartupDelay) { @@ -1830,9 +1834,9 @@ public class IgniteConfiguration { * Sets fully configured instance of {@link LoadBalancingSpi}. * * @param loadBalancingSpi Fully configured instance of {@link LoadBalancingSpi} or - * {@code null} if no SPI provided. - * @see IgniteConfiguration#getLoadBalancingSpi() + * {@code null} if no SPI provided. * @return {@code this} for chaining. + * @see IgniteConfiguration#getLoadBalancingSpi() */ public IgniteConfiguration setLoadBalancingSpi(LoadBalancingSpi... loadBalancingSpi) { this.loadBalancingSpi = loadBalancingSpi; @@ -1844,9 +1848,9 @@ public class IgniteConfiguration { * Sets fully configured instances of {@link SwapSpaceSpi}. * * @param swapSpaceSpi Fully configured instances of {@link SwapSpaceSpi} or - * <tt>null</tt> if no SPI provided. - * @see IgniteConfiguration#getSwapSpaceSpi() + * <tt>null</tt> if no SPI provided. * @return {@code this} for chaining. + * @see IgniteConfiguration#getSwapSpaceSpi() */ public IgniteConfiguration setSwapSpaceSpi(SwapSpaceSpi swapSpaceSpi) { this.swapSpaceSpi = swapSpaceSpi; @@ -1871,8 +1875,8 @@ public class IgniteConfiguration { * Sets fully configured instances of {@link IndexingSpi}. * * @param indexingSpi Fully configured instance of {@link IndexingSpi}. - * @see IgniteConfiguration#getIndexingSpi() * @return {@code this} for chaining. + * @see IgniteConfiguration#getIndexingSpi() */ public IgniteConfiguration setIndexingSpi(IndexingSpi indexingSpi) { this.indexingSpi = indexingSpi; @@ -2046,12 +2050,12 @@ public class IgniteConfiguration { * Gets flag indicating whether cache sanity check is enabled. If enabled, then Ignite * will perform the following checks and throw an exception if check fails: * <ul> - * <li>Cache entry is not externally locked with {@code lock(...)} or {@code lockAsync(...)} - * methods when entry is enlisted to transaction.</li> - * <li>Each entry in affinity group-lock transaction has the same affinity key as was specified on - * affinity transaction start.</li> - * <li>Each entry in partition group-lock transaction belongs to the same partition as was specified - * on partition transaction start.</li> + * <li>Cache entry is not externally locked with {@code lock(...)} or {@code lockAsync(...)} + * methods when entry is enlisted to transaction.</li> + * <li>Each entry in affinity group-lock transaction has the same affinity key as was specified on + * affinity transaction start.</li> + * <li>Each entry in partition group-lock transaction belongs to the same partition as was specified + * on partition transaction start.</li> * </ul> * <p> * These checks are not required for cache operation, but help to find subtle bugs. Disabling of this checks @@ -2069,8 +2073,8 @@ public class IgniteConfiguration { * Sets cache sanity check flag. * * @param cacheSanityCheckEnabled {@code True} if cache sanity check is enabled. - * @see #isCacheSanityCheckEnabled() * @return {@code this} for chaining. + * @see #isCacheSanityCheckEnabled() */ public IgniteConfiguration setCacheSanityCheckEnabled(boolean cacheSanityCheckEnabled) { this.cacheSanityCheckEnabled = cacheSanityCheckEnabled; @@ -2298,6 +2302,23 @@ public class IgniteConfiguration { } /** + * @return Redis configuration. + */ + public RedisConfiguration getRedisConfiguration() { + return redisCfg; + } + + /** + * @param redisCfg Redis configuration. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setRedisConfiguration(RedisConfiguration redisCfg) { + this.redisCfg = redisCfg; + + return this; + } + + /** * Gets configurations for services to be deployed on the grid. * * @return Configurations for services to be deployed on the grid. @@ -2356,8 +2377,8 @@ public class IgniteConfiguration { * Sets warmup closure to execute before grid startup. * * @param warmupClos Warmup closure to execute. - * @see #getWarmupClosure() * @return {@code this} for chaining. + * @see #getWarmupClosure() */ public IgniteConfiguration setWarmupClosure(IgniteInClosure<IgniteConfiguration> warmupClos) { this.warmupClos = warmupClos; http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/configuration/RedisConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/RedisConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/RedisConfiguration.java new file mode 100644 index 0000000..2db0c4c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/RedisConfiguration.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +/** + * Redis protocol configuration. + */ +public class RedisConfiguration extends ConnectorConfiguration { + /** Default TCP server port. */ + public static final int DFLT_TCP_PORT = 6379; + + /** Host. */ + private String host; + + /** Port. */ + private int port = DFLT_TCP_PORT; + + /** + * Creates Redis connection configuration with all default values. + */ + public RedisConfiguration() { + // No-op. + } + + /** + * Creates Redis configuration by copying all properties from given configuration. + * + * @param cfg Client configuration. + */ + public RedisConfiguration(RedisConfiguration cfg) { + assert cfg != null; + + host = cfg.getHost(); + port = cfg.getPort(); + } + + /** + * Gets host for Redis protocol server. This can be either an + * IP address or a domain name. + * <p> + * If not defined, system-wide local address will be used + * (see {@link IgniteConfiguration#getLocalHost()}. + * <p> + * You can also use {@code 0.0.0.0} value to bind to all + * locally-available IP addresses. + * + * @return Redis host. + */ + @Override public String getHost() { + return host; + } + + /** + * Sets host for Redis protocol server. + * + * @param host Redis host. + */ + @Override public void setHost(String host) { + this.host = host; + } + + /** + * Gets port for Redis protocol server. + * <p> + * Default is {@link #DFLT_TCP_PORT}. + * + * @return TCP port. + */ + @Override public int getPort() { + return port; + } + + /** + * Sets port for Redis protocol server. + * + * @param port TCP port. + */ + @Override public void setPort(int port) { + this.port = port; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index c0b50a2..731c3a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.redis.GridRedisProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.processors.rest.GridRestProcessor; import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter; @@ -241,6 +242,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public GridRestProcessor rest(); /** + * Gets REdis Serialization Protocol (RESP) processor. + * + * @return RESP processor. + */ + public GridRedisProcessor redis(); + + /** * Gets segmentation processor. * * @return Segmentation processor. @@ -445,7 +453,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @param ldr Class loader. * @return User version for given class loader or empty string if no version - * was explicitly specified. + * was explicitly specified. */ public String userVersion(ClassLoader ldr); @@ -466,7 +474,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { /** * @return Thread pool implementation to be used in grid to process job execution - * requests and user messages sent to the node. + * requests and user messages sent to the node. */ public ExecutorService getExecutorService(); @@ -482,13 +490,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { * {@link org.apache.ignite.compute.ComputeJob GridJobs}. * * @return Thread pool implementation to be used in grid for internal and Visor - * jobs processing. + * jobs processing. */ public ExecutorService getManagementExecutorService(); /** * @return Thread pool implementation to be used for peer class loading - * requests handling. + * requests handling. */ public ExecutorService getPeerClassLoadingExecutorService(); @@ -504,11 +512,19 @@ public interface GridKernalContext extends Iterable<GridComponent> { * processing of client messages (REST requests). * * @return Thread pool implementation to be used for processing of client - * messages. + * messages. */ public ExecutorService getRestExecutorService(); /** + * Should return an instance of fully configured thread pool to be used for + * processing of Redis client messages. + * + * @return Thread pool implementation to be used for processing of Redis client messages. + */ + public ExecutorService getRedisExecutorService(); + + /** * Gets exception registry. * * @return Exception registry. http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 22fd96c..abc9dd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.redis.GridRedisProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.processors.rest.GridRestProcessor; import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter; @@ -220,6 +221,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude + private GridRedisProcessor redisProc; + + /** */ + @GridToStringInclude private DataStreamProcessor dataLdrProc; /** */ @@ -300,6 +305,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected ExecutorService redisExecSvc; + + /** */ + @GridToStringExclude private Map<String, Object> attrs = new HashMap<>(); /** */ @@ -357,6 +366,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. * @param restExecSvc REST executor service. + * @param redisExecSvc Redis executor service. * @param plugins Plugin providers. * @throws IgniteCheckedException In case of error. */ @@ -374,6 +384,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + ExecutorService redisExecSvc, List<PluginProvider> plugins) throws IgniteCheckedException { assert grid != null; assert cfg != null; @@ -390,6 +401,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; this.restExecSvc = restExecSvc; + this.redisExecSvc = redisExecSvc; marshCtx = new MarshallerContextImpl(plugins); @@ -490,6 +502,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable affProc = (GridAffinityProcessor)comp; else if (comp instanceof GridRestProcessor) restProc = (GridRestProcessor)comp; + else if (comp instanceof GridRedisProcessor) + redisProc = (GridRedisProcessor)comp; else if (comp instanceof DataStreamProcessor) dataLdrProc = (DataStreamProcessor)comp; else if (comp instanceof IgfsProcessorAdapter) @@ -698,6 +712,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public GridRedisProcessor redis() { + return redisProc; + } + + /** {@inheritDoc} */ @Override public GridSegmentationProcessor segmentation() { return segProc; } @@ -904,6 +923,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService getRedisExecutorService() { + return redisExecSvc; + } + + /** {@inheritDoc} */ @Override public IgniteExceptionRegistry exceptionRegistry() { return IgniteExceptionRegistry.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8df89f3..8e5ce34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -124,6 +124,7 @@ import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.redis.GridRedisProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.processors.rest.GridRestProcessor; @@ -657,6 +658,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + ExecutorService redisExecSvc, GridAbsClosure errHnd) throws IgniteCheckedException { @@ -761,6 +763,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { mgmtExecSvc, igfsExecSvc, restExecSvc, + redisExecSvc, plugins); cfg.getMarshaller().setContext(ctx.marshallerContext()); @@ -855,6 +858,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(new GridTaskProcessor(ctx)); startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); startProcessor(new GridRestProcessor(ctx)); + startProcessor(new GridRedisProcessor(ctx)); startProcessor(new DataStreamProcessor(ctx)); startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration()))); startProcessor(new GridContinuousProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 533b6d8..a26361f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -60,6 +60,7 @@ import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.RedisConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.igfs.IgfsUtils; @@ -1429,6 +1430,9 @@ public class IgnitionEx { /** REST requests executor service. */ private ExecutorService restExecSvc; + /** Redis requests executor service. */ + private ExecutorService redisExecSvc; + /** Utility cache executor service. */ private ExecutorService utilityCacheExecSvc; @@ -1658,6 +1662,17 @@ public class IgnitionEx { ); } + if (myCfg.getRedisConfiguration() != null) { + redisExecSvc = new IgniteThreadPoolExecutor( + "redis", + myCfg.getGridName(), + myCfg.getRedisConfiguration().getThreadPoolSize(), + myCfg.getRedisConfiguration().getThreadPoolSize(), + RedisConfiguration.DFLT_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(RedisConfiguration.DFLT_THREADPOOL_QUEUE_CAP) + ); + } + utilityCacheExecSvc = new IgniteThreadPoolExecutor( "utility", cfg.getGridName(), @@ -1686,7 +1701,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, + igfsExecSvc, restExecSvc, redisExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2282,6 +2297,11 @@ public class IgnitionEx { restExecSvc = null; + if (redisExecSvc != null) + U.shutdownNow(getClass(), redisExecSvc, log); + + redisExecSvc = null; + U.shutdownNow(getClass(), utilityCacheExecSvc, log); utilityCacheExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisCommand.java new file mode 100644 index 0000000..c6bc478 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisCommand.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +/** + * Supported Redis commands. + */ +public enum GridRedisCommand { + // Connections. + /** Ping. */ + PING("PING"), + /** Connection close. */ + QUIT("QUIT"), + /** Echo. */ + ECHO("ECHO"), + + // Cache operations. + /** Get. */ + GET("GET"); + + /** String for command. */ + private final String cmd; + + GridRedisCommand(String cmd) { + this.cmd = cmd; + } + + public String cmd() { + return cmd; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisMessage.java new file mode 100644 index 0000000..bfb9543 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisMessage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Message to communicate with Redis client. Contains command, its attributes and response. + */ +public class GridRedisMessage implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + private static final int CMD_POS = 0; + + private static final int KEY_POS = 1; + + /** Request message parts. */ + private final List<String> msgParts; + + private ByteBuffer response; + + public GridRedisMessage(int msgLen) { + msgParts = new ArrayList<>(msgLen); + } + + public void append(String part) { + msgParts.add(part); + } + + public void setResponse(ByteBuffer response) { + this.response = response; + } + + public ByteBuffer getResponse() { + return response; + } + + /** + * @return {@link GridRedisCommand}. + */ + public GridRedisCommand command() { + return GridRedisCommand.valueOf(msgParts.get(CMD_POS).toUpperCase()); + } + + /** + * @return Key for the command. + */ + public String key() { + if (msgParts.size() <= KEY_POS) + return null; + + return msgParts.get(KEY_POS); + } + + @Override public String toString() { + return "GridRedisMessage [msg: " + msgParts + "]"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisNioListener.java new file mode 100644 index 0000000..34c5185 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisNioListener.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.nio.GridNioFuture; +import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.CIX1; +import org.jetbrains.annotations.Nullable; + +/** + * Listener for Redis protocol requests. + */ +public class GridRedisNioListener extends GridNioServerListenerAdapter<GridRedisMessage> { + /** Protocol handler. */ + private GridRedisProtocolHandler hnd; + + public GridRedisNioListener(GridRedisProtocolHandler hnd, GridKernalContext ctx) { + this.hnd = hnd; + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, GridRedisMessage msg) { + IgniteInternalFuture<GridRedisMessage> f = hnd.handleAsync(msg); + f.listen(new CIX1<IgniteInternalFuture<GridRedisMessage>>() { + @Override public void applyx(IgniteInternalFuture<GridRedisMessage> f) throws IgniteCheckedException { + GridRedisMessage restRes = f.get(); + sendResponse(ses, restRes); + } + }); + } + + /** + * Sends a response to be decoded and sent to the Redis client. + * + * @param ses NIO session. + * @param res Response. + * @return NIO send future. + */ + private GridNioFuture<?> sendResponse(GridNioSession ses, GridRedisMessage res) { + return ses.send(res); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProcessor.java new file mode 100644 index 0000000..40899b2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProcessor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.redis.handler.GridRedisConnectionCommandHandler; + +/** + * Redis processor implementation. + */ +public class GridRedisProcessor extends GridProcessorAdapter { + /** Server. */ + private GridRedisServer protoSrv; + + /** Protocol handler. */ + private GridRedisProtocolHandler hnd; + + /** + * @param ctx Kernal context. + */ + public GridRedisProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (!isRedisEnabled()) + return; + + hnd = new GridRedisProtocolHandler(ctx, log); + + hnd.addCommandHandler(new GridRedisConnectionCommandHandler()); + + protoSrv = new GridRedisServer(ctx); + + protoSrv.start(hnd); + + if (log.isDebugEnabled()) + log.debug("Enabled Redis protocol: " + protoSrv); + } + + /** + * @return Whether or not Redis protocol is enabled. + */ + private boolean isRedisEnabled() { + return !ctx.config().isDaemon() && ctx.config().getRedisConfiguration() != null; + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + if (!isRedisEnabled()) + return; + + hnd.start(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (!isRedisEnabled()) + return; + + if (hnd != null) + hnd.stopGracefully(); + + if (protoSrv != null) + protoSrv.stop(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolHandler.java new file mode 100644 index 0000000..3dc00ef --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolHandler.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.redis.handler.GridRedisCommandHandler; +import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.GridWorkerFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.jsr166.LongAdder8; + +/** + * Command protocol handler. + */ +public class GridRedisProtocolHandler { + /** Worker name. */ + private final static String WORKER_NAME = "redis-proc-worker"; + + /** Logger. */ + private final IgniteLogger log; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Busy lock. */ + private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); + + /** Workers count. */ + private final LongAdder8 workersCnt = new LongAdder8(); + + /** OK-to-start latch. */ + private final CountDownLatch startLatch = new CountDownLatch(1); + + /** Command handlers. */ + protected final Map<GridRedisCommand, GridRedisCommandHandler> handlers = new EnumMap<>(GridRedisCommand.class); + + /** + * @param log + */ + public GridRedisProtocolHandler(GridKernalContext ctx, IgniteLogger log) { + this.ctx = ctx; + this.log = log; + } + + public void addCommandHandler(GridRedisCommandHandler hnd) { + assert !handlers.containsValue(hnd); + + if (log.isDebugEnabled()) + log.debug("Added Redis command handler: " + hnd); + + for (GridRedisCommand cmd : hnd.supportedCommands()) { + assert !handlers.containsKey(cmd) : cmd; + + handlers.put(cmd, hnd); + } + } + + /** + * @param msg Request message. + * @return Response. + * @throws IgniteCheckedException In case of error. + */ + public GridRedisMessage handle(GridRedisMessage msg) throws IgniteCheckedException { + return handleAsync(msg).get(); + } + + /** + * @param msg Request message. + * @return Future. + */ + public IgniteInternalFuture<GridRedisMessage> handleAsync(final GridRedisMessage msg) { + if (!busyLock.tryReadLock()) + return new GridFinishedFuture<>( + new IgniteCheckedException("Failed to handle request (received request while stopping grid).")); + + try { + final GridWorkerFuture<GridRedisMessage> fut = new GridWorkerFuture<>(); + + workersCnt.increment(); + + GridWorker w = new GridWorker(ctx.gridName(), WORKER_NAME, log) { + @Override protected void body() { + try { + IgniteInternalFuture<GridRedisMessage> res = handleRequest(msg); + + res.listen(new IgniteInClosure<IgniteInternalFuture<GridRedisMessage>>() { + @Override public void apply(IgniteInternalFuture<GridRedisMessage> f) { + try { + fut.onDone(f.get()); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + } + }); + } + catch (Throwable e) { + if (e instanceof Error) + U.error(log, "Client request execution failed with error.", e); + + fut.onDone(U.cast(e)); + + if (e instanceof Error) + throw e; + } + finally { + workersCnt.decrement(); + } + } + }; + + fut.setWorker(w); + + try { + ctx.getRedisExecutorService().execute(w); + } + catch (RejectedExecutionException e) { + U.error(log, "Failed to execute worker due to execution rejection " + + "(increase upper bound on Redis executor service). " + + "Will attempt to process request in the current thread instead.", e); + + w.run(); + } + + return fut; + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Handles with the assigned command handler. + * + * @param req Request. + * @return Future. + */ + private IgniteInternalFuture<GridRedisMessage> handleRequest(final GridRedisMessage req) { + if (startLatch.getCount() > 0) { + try { + startLatch.await(); + } + catch (InterruptedException e) { + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to handle request " + + "(protocol handler was interrupted when awaiting grid start).", e)); + } + } + + if (log.isDebugEnabled()) + log.debug("Request from Redis client: " + req); + + GridRedisCommandHandler hnd = handlers.get(req.command()); + + IgniteInternalFuture<GridRedisMessage> res = hnd == null ? null : hnd.handleAsync(req); + + if (res == null) + return new GridFinishedFuture<>( + new IgniteCheckedException("Failed to find registered handler for command: " + req.command())); + + return res; + } + + /** + * Starts request handling. + */ + void start() { + startLatch.countDown(); + } + + /** + * Ensures pending requests are processed and no more requests are taken. + */ + void stopGracefully() { + busyLock.writeLock(); + + boolean interrupted = Thread.interrupted(); + + while (workersCnt.sum() != 0) { + try { + Thread.sleep(200); + } + catch (InterruptedException ignored) { + interrupted = true; + } + } + + if (interrupted) + Thread.currentThread().interrupt(); + + startLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolParser.java new file mode 100644 index 0000000..416c20a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisProtocolParser.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.nio.GridNioParser; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.jetbrains.annotations.Nullable; + +/** + * Parser to decode/encode Redis protocol requests. + */ +public class GridRedisProtocolParser implements GridNioParser { + private final IgniteLogger log; + + /** + prefix. */ + private static final byte SIMPLE_STRING = 43; + + /** $ */ + private static final byte BULK_STRING = 36; + + /** : */ + private static final byte INTEGER = 58; + + /** * */ + private static final byte ARRAY = 42; + + /** - */ + private static final byte ERROR = 45; + + /** Carriage return code. */ + private static final byte CR = 13; + + /** Line feed code. */ + private static final byte LF = 10; + + /** CRLF. */ + private static final byte[] CRLF = new byte[] {13, 10}; + + public GridRedisProtocolParser(IgniteLogger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridRedisMessage decode(GridNioSession ses, ByteBuffer buf) + throws IOException, IgniteCheckedException { + return readArray(buf); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg0) throws IOException, IgniteCheckedException { + assert msg0 != null; + + GridRedisMessage msg = (GridRedisMessage)msg0; + + return msg.getResponse(); + } + + private GridRedisMessage readArray(ByteBuffer buf) throws IgniteCheckedException { + System.out.println(new String(buf.array())); + byte b = buf.get(); + + if (b != ARRAY) + throw new IgniteCheckedException("Invalid request byte! " + b); + + int arrLen = elCnt(buf); + + GridRedisMessage msg = new GridRedisMessage(arrLen); + + for (int i = 0; i < arrLen; i++) + msg.append(readBulkStr(buf)); + + return msg; + } + + private String readBulkStr(ByteBuffer buf) throws IgniteCheckedException { + byte b = buf.get(); + + if (b != BULK_STRING) + throw new IgniteCheckedException("Invalid bulk string prefix! " + b); + + int len = elCnt(buf); + byte[] bulkStr = new byte[len]; + + buf.get(bulkStr, 0, len); + + if (buf.get() != CR || buf.get() != LF) + throw new IgniteCheckedException("Invalid request syntax!"); + + return new String(bulkStr); + } + + /** + * @param buf + * @return Count of elements. + */ + private int elCnt(ByteBuffer buf) throws IgniteCheckedException { + byte[] arrLen = new byte[9]; + + int idx = 0; + byte b = buf.get(); + while (b != CR) { + arrLen[idx++] = b; + b = buf.get(); + } + + if (buf.get() != LF) + throw new IgniteCheckedException("Invalid request syntax!"); + + return Integer.parseInt(new String(arrLen, 0, idx)); + } + + /** + * Converts a simple string data to a {@link ByteBuffer}. + * + * @param str + * @return + */ + public static ByteBuffer toSimpleString(String str) { + byte[] b = str.getBytes(); + + ByteBuffer buf = ByteBuffer.allocate(b.length + 3); + buf.put(SIMPLE_STRING); + buf.put(b); + buf.put(CRLF); + + buf.flip(); + + return buf; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisServer.java new file mode 100644 index 0000000..3657ecc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/GridRedisServer.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteOrder; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.RedisConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.nio.GridNioCodecFilter; +import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioParser; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioServerListener; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgnitePortProtocol; + +/** + * Redis protocol server. + */ +public class GridRedisServer { + /** Context. */ + protected final GridKernalContext ctx; + + /** Server. */ + private GridNioServer<GridRedisMessage> srv; + + /** NIO server listener. */ + private GridRedisNioListener lsnr; + + /** Host used by this protocol. */ + protected InetAddress host; + + /** Port used by this protocol. */ + protected int port; + + /** Logger. */ + protected final IgniteLogger log; + + public GridRedisServer(GridKernalContext ctx) { + assert ctx != null; + + this.ctx = ctx; + + log = ctx.log(getClass()); + } + + public void start(final GridRedisProtocolHandler hnd) throws IgniteCheckedException { + assert hnd != null; + + RedisConfiguration cfg = ctx.config().getRedisConfiguration(); + + assert cfg != null; + + lsnr = new GridRedisNioListener(hnd, ctx); + + try { + host = resolveRedisHost(ctx.config()); + + int lastPort = cfg.getPort() + cfg.getPortRange() - 1; + + for (int port0 = cfg.getPort(); port0 <= lastPort; port0++) { + if (startServer(host, port0, lsnr, cfg)) { + port = port0; + + if (log.isInfoEnabled()) + log.info(startInfo()); + + return; + } + } + + U.warn(log, "Failed to start " + name() + " server (possibly all ports in range are in use) " + + "[firstPort=" + cfg.getPort() + ", lastPort=" + lastPort + ", host=" + host + ']'); + } + catch (IOException e) { + U.warn(log, "Failed to start " + name() + " on port " + port + ": " + e.getMessage(), + "Failed to start " + name() + " on port " + port + ". " + + "Check redisHost configuration property."); + } + + } + + /** + * Stops the server. + */ + public void stop() { + if (srv != null) { + ctx.ports().deregisterPorts(getClass()); + + srv.stop(); + } + + if (log.isInfoEnabled()) + log.info(stopInfo()); + } + + /** + * Starts a server with given parameters. + * + * @param hostAddr Host on which server should be bound. + * @param port Port on which server should be bound. + * @param lsnr Server message listener. + * @param cfg Configuration for other parameters. + * @return {@code True} if server successfully started, {@code false} if port is used and + * server was unable to start. + */ + private boolean startServer(InetAddress hostAddr, int port, GridNioServerListener<GridRedisMessage> lsnr, + RedisConfiguration cfg) { + try { + GridNioParser parser = new GridRedisProtocolParser(log); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, false); + + GridNioFilter[] filters; + + filters = new GridNioFilter[] {codec}; + + srv = GridNioServer.<GridRedisMessage>builder() + .address(hostAddr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(cfg.getSelectorCount()) + .gridName(ctx.gridName()) + .tcpNoDelay(cfg.isNoDelay()) + .directBuffer(cfg.isDirectBuffer()) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(cfg.getSendBufferSize()) + .socketReceiveBufferSize(cfg.getReceiveBufferSize()) + .sendQueueLimit(cfg.getSendQueueLimit()) + .filters(filters) + .directMode(false) + .build(); + + srv.idleTimeout(cfg.getIdleTimeout()); + + srv.start(); + + ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); + + return true; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to start " + name() + " on port " + port + ": " + e.getMessage()); + + return false; + } + } + + /** + * Resolves host for Redis server using grid configuration. + * + * @param cfg Grid configuration. + * @return Redis host. + * @throws IOException If failed to resolve REST host. + */ + private InetAddress resolveRedisHost(IgniteConfiguration cfg) throws IOException { + String host = cfg.getRedisConfiguration().getHost(); + + if (host == null) + host = cfg.getLocalHost(); + + return U.resolveLocalHost(host); + } + + /** + * @return Start information string. + */ + protected String startInfo() { + return "Command protocol successfully started [name=" + name() + ", host=" + host + ", port=" + port + ']'; + } + + /** + * @return Stop information string. + */ + protected String stopInfo() { + return "Command protocol successfully stopped: " + name(); + } + + /** + * @return Protocol name. + */ + public String name() { + return "Redis protocol"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/90fe51e3/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/handler/GridRedisCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/handler/GridRedisCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/handler/GridRedisCommandHandler.java new file mode 100644 index 0000000..ac61251 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/redis/handler/GridRedisCommandHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.redis.handler; + +import java.util.Collection; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.redis.GridRedisCommand; +import org.apache.ignite.internal.processors.redis.GridRedisMessage; +import org.apache.ignite.internal.processors.rest.GridRestResponse; + +/** + * Command handler. + */ +public interface GridRedisCommandHandler { + /** + * @return Collection of supported commands. + */ + Collection<GridRedisCommand> supportedCommands(); + + /** + * @param msg Request message. + * @return Future. + */ + IgniteInternalFuture<GridRedisMessage> handleAsync(GridRedisMessage msg); +}
