Repository: storm Updated Branches: refs/heads/0.10.x-branch 4dd2571a2 -> 6bef69cb6
STORM-843 Add Javadoc to storm-redis * leaves javadoc for Trident API : it should be treated from Trident API * some code cleanup Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b4844fd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b4844fd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b4844fd Branch: refs/heads/0.10.x-branch Commit: 0b4844fda8cd282e260597aa03685c2766ef7e76 Parents: 4dd2571 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Mon Jun 15 08:14:11 2015 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Wed Jul 8 09:58:41 2015 +0900 ---------------------------------------------------------------------- .../storm/redis/bolt/AbstractRedisBolt.java | 42 +++++++ .../storm/redis/bolt/RedisLookupBolt.java | 23 +++- .../apache/storm/redis/bolt/RedisStoreBolt.java | 21 ++++ .../redis/common/config/JedisClusterConfig.java | 55 +++++++- .../redis/common/config/JedisPoolConfig.java | 66 ++++++++++ .../common/container/JedisClusterContainer.java | 18 +++ .../JedisCommandsContainerBuilder.java | 15 +++ .../JedisCommandsInstanceContainer.java | 12 ++ .../redis/common/container/JedisContainer.java | 16 +++ .../common/mapper/RedisDataTypeDescription.java | 20 +++ .../redis/common/mapper/RedisLookupMapper.java | 7 +- .../storm/redis/common/mapper/RedisMapper.java | 9 +- .../redis/common/mapper/RedisStoreMapper.java | 3 + .../storm/redis/common/mapper/TupleMapper.java | 18 ++- .../trident/state/AbstractRedisMapState.java | 41 ++++++ .../state/AbstractRedisStateQuerier.java | 28 ++++- .../state/AbstractRedisStateUpdater.java | 27 ++++ .../storm/redis/trident/state/KeyFactory.java | 22 ++++ .../storm/redis/trident/state/Options.java | 6 + .../trident/state/RedisClusterMapState.java | 124 +++++++++++++++++- .../redis/trident/state/RedisClusterState.java | 44 ++++++- .../trident/state/RedisClusterStateQuerier.java | 19 ++- .../trident/state/RedisClusterStateUpdater.java | 19 +++ .../redis/trident/state/RedisMapState.java | 126 +++++++++++++++++-- .../storm/redis/trident/state/RedisState.java | 44 ++++++- .../redis/trident/state/RedisStateQuerier.java | 19 ++- .../redis/trident/state/RedisStateUpdater.java | 19 +++ 27 files changed, 822 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java index 158fcaa..dc2a2d3 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java @@ -28,6 +28,26 @@ import redis.clients.jedis.JedisCommands; import java.util.Map; +/** + * AbstractRedisBolt class is for users to implement custom bolts which makes interaction with Redis. + * <p/> + * Due to environment abstraction, AbstractRedisBolt provides JedisCommands which contains only single key operations. + * <p/> + * Custom Bolts may want to follow this pattern: + * <p><blockquote><pre> + * JedisCommands jedisCommands = null; + * try { + * jedisCommand = getInstance(); + * // do some works + * } finally { + * if (jedisCommand != null) { + * returnInstance(jedisCommand); + * } + * } + * </pre></blockquote> + * + */ +// TODO: Separate Jedis / JedisCluster to provide full operations for each environment to users public abstract class AbstractRedisBolt extends BaseRichBolt { protected OutputCollector collector; @@ -36,16 +56,28 @@ public abstract class AbstractRedisBolt extends BaseRichBolt { private JedisPoolConfig jedisPoolConfig; private JedisClusterConfig jedisClusterConfig; + /** + * Constructor for single Redis environment (JedisPool) + * @param config configuration for initializing JedisPool + */ public AbstractRedisBolt(JedisPoolConfig config) { this.jedisPoolConfig = config; } + /** + * Constructor for Redis Cluster environment (JedisCluster) + * @param config configuration for initializing JedisCluster + */ public AbstractRedisBolt(JedisClusterConfig config) { this.jedisClusterConfig = config; } + /** + * {@inheritDoc} + */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { + // FIXME: stores map (stormConf), topologyContext and expose these to derived classes this.collector = collector; if (jedisPoolConfig != null) { @@ -57,10 +89,20 @@ public abstract class AbstractRedisBolt extends BaseRichBolt { } } + /** + * Borrow JedisCommands instance from container.<p/> + * JedisCommands is an interface which contains single key operations. + * @return implementation of JedisCommands + * @see JedisCommandsInstanceContainer#getInstance() + */ protected JedisCommands getInstance() { return this.container.getInstance(); } + /** + * Return borrowed instance to container. + * @param instance borrowed object + */ protected void returnInstance(JedisCommands instance) { this.container.returnInstance(instance); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java index bbd7e6a..47c98cb 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java @@ -28,11 +28,21 @@ import redis.clients.jedis.JedisCommands; import java.util.List; +/** + * Basic bolt for querying from Redis and emits response as tuple. + * <p/> + * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG + */ public class RedisLookupBolt extends AbstractRedisBolt { private final RedisLookupMapper lookupMapper; private final RedisDataTypeDescription.RedisDataType dataType; private final String additionalKey; + /** + * Constructor for single Redis environment (JedisPool) + * @param config configuration for initializing JedisPool + * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses + */ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) { super(config); @@ -43,6 +53,11 @@ public class RedisLookupBolt extends AbstractRedisBolt { this.additionalKey = dataTypeDescription.getAdditionalKey(); } + /** + * Constructor for Redis Cluster environment (JedisCluster) + * @param config configuration for initializing JedisCluster + * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses + */ public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) { super(config); @@ -53,10 +68,13 @@ public class RedisLookupBolt extends AbstractRedisBolt { this.additionalKey = dataTypeDescription.getAdditionalKey(); } + /** + * {@inheritDoc} + */ @Override public void execute(Tuple input) { String key = lookupMapper.getKeyFromTuple(input); - Object lookupValue = null; + Object lookupValue; JedisCommands jedisCommand = null; try { @@ -105,6 +123,9 @@ public class RedisLookupBolt extends AbstractRedisBolt { } } + /** + * {@inheritDoc} + */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { lookupMapper.declareOutputFields(declarer); http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java index 761c5ed..be9a328 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -25,11 +25,21 @@ import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import redis.clients.jedis.JedisCommands; +/** + * Basic bolt for writing to Redis + * <p/> + * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG + */ public class RedisStoreBolt extends AbstractRedisBolt { private final RedisStoreMapper storeMapper; private final RedisDataTypeDescription.RedisDataType dataType; private final String additionalKey; + /** + * Constructor for single Redis environment (JedisPool) + * @param config configuration for initializing JedisPool + * @param storeMapper mapper containing which datatype, storing value's key that Bolt uses + */ public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { super(config); this.storeMapper = storeMapper; @@ -39,6 +49,11 @@ public class RedisStoreBolt extends AbstractRedisBolt { this.additionalKey = dataTypeDescription.getAdditionalKey(); } + /** + * Constructor for Redis Cluster environment (JedisCluster) + * @param config configuration for initializing JedisCluster + * @param storeMapper mapper containing which datatype, storing value's key that Bolt uses + */ public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) { super(config); this.storeMapper = storeMapper; @@ -48,6 +63,9 @@ public class RedisStoreBolt extends AbstractRedisBolt { this.additionalKey = dataTypeDescription.getAdditionalKey(); } + /** + * {@inheritDoc} + */ @Override public void execute(Tuple input) { String key = storeMapper.getKeyFromTuple(input); @@ -94,6 +112,9 @@ public class RedisStoreBolt extends AbstractRedisBolt { } } + /** + * {@inheritDoc} + */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java index a13eced..d8696aa 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java @@ -26,56 +26,105 @@ import java.net.InetSocketAddress; import java.util.HashSet; import java.util.Set; +/** + * Configuration for JedisCluster. + */ public class JedisClusterConfig implements Serializable { private Set<InetSocketAddress> nodes; private int timeout; private int maxRedirections; + /** + * Constructor + * <p/> + * You can use JedisClusterConfig.Builder() for leaving some fields to apply default value. + * <p/> + * Note that list of node is mandatory, and when you didn't set nodes, it throws NullPointerException. + * + * @param nodes list of node information for JedisCluster + * @param timeout socket / connection timeout + * @param maxRedirections limit of redirections - how much we'll follow MOVED or ASK + * @throws NullPointerException when you didn't set nodes + */ public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) { + Preconditions.checkNotNull(nodes, "Node information should be presented"); + this.nodes = nodes; this.timeout = timeout; this.maxRedirections = maxRedirections; } + /** + * Returns nodes. + * @return list of node information + */ public Set<HostAndPort> getNodes() { - Set<HostAndPort> ret = new HashSet<HostAndPort>(); + Set<HostAndPort> ret = new HashSet<>(); for (InetSocketAddress node : nodes) { ret.add(new HostAndPort(node.getHostName(), node.getPort())); } return ret; } + /** + * Returns socket / connection timeout. + * @return socket / connection timeout + */ public int getTimeout() { return timeout; } + /** + * Returns limit of redirection. + * @return limit of redirection + */ public int getMaxRedirections() { return maxRedirections; } + /** + * Builder for initializing JedisClusterConfig. + */ public static class Builder { private Set<InetSocketAddress> nodes; private int timeout = Protocol.DEFAULT_TIMEOUT; private int maxRedirections = 5; + /** + * Sets list of node. + * @param nodes list of node + * @return Builder itself + */ public Builder setNodes(Set<InetSocketAddress> nodes) { this.nodes = nodes; return this; } + /** + * Sets socket / connection timeout. + * @param timeout socket / connection timeout + * @return Builder itself + */ public Builder setTimeout(int timeout) { this.timeout = timeout; return this; } + /** + * Sets limit of redirection. + * @param maxRedirections limit of redirection + * @return Builder itself + */ public Builder setMaxRedirections(int maxRedirections) { this.maxRedirections = maxRedirections; return this; } + /** + * Builds JedisClusterConfig. + * @return JedisClusterConfig + */ public JedisClusterConfig build() { - Preconditions.checkNotNull(this.nodes, "Node information should be presented"); - return new JedisClusterConfig(nodes, timeout, maxRedirections); } } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java index cc5f6e4..8b3a2e5 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java @@ -21,6 +21,9 @@ import redis.clients.jedis.Protocol; import java.io.Serializable; +/** + * Configuration for JedisPool. + */ public class JedisPoolConfig implements Serializable { public static final String DEFAULT_HOST = "127.0.0.1"; @@ -30,6 +33,17 @@ public class JedisPoolConfig implements Serializable { private int database; private String password; + /** + * Constructor + * <p/> + * You can use JedisPoolConfig.Builder() for leaving some fields to apply default value. + * + * @param host hostname or IP + * @param port port + * @param timeout socket / connection timeout + * @param database database index + * @param password password, if any + */ public JedisPoolConfig(String host, int port, int timeout, String password, int database) { this.host = host; this.port = port; @@ -38,26 +52,49 @@ public class JedisPoolConfig implements Serializable { this.password = password; } + /** + * Returns host. + * @return hostname or IP + */ public String getHost() { return host; } + /** + * Returns port. + * @return port + */ public int getPort() { return port; } + /** + * Returns timeout. + * @return socket / connection timeout + */ public int getTimeout() { return timeout; } + /** + * Returns database index. + * @return database index + */ public int getDatabase() { return database; } + /** + * Returns password. + * @return password + */ public String getPassword() { return password; } + /** + * Builder for initializing JedisPoolConfig. + */ public static class Builder { private String host = DEFAULT_HOST; private int port = Protocol.DEFAULT_PORT; @@ -65,31 +102,60 @@ public class JedisPoolConfig implements Serializable { private int database = Protocol.DEFAULT_DATABASE; private String password; + /** + * Sets host. + * @param host host + * @return Builder itself + */ public Builder setHost(String host) { this.host = host; return this; } + /** + * Sets port. + * @param port port + * @return Builder itself + */ public Builder setPort(int port) { this.port = port; return this; } + /** + * Sets timeout. + * @param timeout timeout + * @return Builder itself + */ public Builder setTimeout(int timeout) { this.timeout = timeout; return this; } + /** + * Sets database index. + * @param database database index + * @return Builder itself + */ public Builder setDatabase(int database) { this.database = database; return this; } + /** + * Sets password. + * @param password password, if any + * @return Builder itself + */ public Builder setPassword(String password) { this.password = password; return this; } + /** + * Builds JedisPoolConfig. + * @return JedisPoolConfig + */ public JedisPoolConfig build() { return new JedisPoolConfig(host, port, timeout, password, database); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java index a1ff19f..ea294a6 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java @@ -22,24 +22,42 @@ import redis.clients.jedis.JedisCommands; import java.io.Closeable; +/** + * Container for managing JedisCluster. + * <p/> + * Note that JedisCluster doesn't need to be pooled since it's thread-safe and it stores pools internally. + */ public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable { private JedisCluster jedisCluster; + /** + * Constructor + * @param jedisCluster JedisCluster instance + */ public JedisClusterContainer(JedisCluster jedisCluster) { this.jedisCluster = jedisCluster; } + /** + * {@inheritDoc} + */ @Override public JedisCommands getInstance() { return this.jedisCluster; } + /** + * {@inheritDoc} + */ @Override public void returnInstance(JedisCommands jedisCommands) { // do nothing } + /** + * {@inheritDoc} + */ @Override public void close() { this.jedisCluster.close(); http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java index a2f8c2e..36a74d6 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java @@ -22,15 +22,30 @@ import org.apache.storm.redis.common.config.JedisPoolConfig; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; +/** + * Container Builder which helps abstraction of two env. - single instance or Redis Cluster. + */ public class JedisCommandsContainerBuilder { + // FIXME: We're using default config since it cannot be serialized + // We still needs to provide some options externally public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); + /** + * Builds container for single Redis environment. + * @param config configuration for JedisPool + * @return container for single Redis environment + */ public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase()); return new JedisContainer(jedisPool); } + /** + * Builds container for Redis Cluster environment. + * @param config configuration for JedisCluster + * @return container for Redis Cluster environment + */ public static JedisCommandsInstanceContainer build(JedisClusterConfig config) { JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG); return new JedisClusterContainer(jedisCluster); http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java index 9ec32b9..743d7f8 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java @@ -19,7 +19,19 @@ package org.apache.storm.redis.common.container; import redis.clients.jedis.JedisCommands; +/** + * Interfaces for containers which stores instances implementing JedisCommands. + */ public interface JedisCommandsInstanceContainer { + /** + * Borrows instance from container. + * @return instance which implements JedisCommands + */ JedisCommands getInstance(); + + /** + * Returns instance to container. + * @param jedisCommands borrowed instance + */ void returnInstance(JedisCommands jedisCommands); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java index 8e1fdf5..e704712 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java @@ -26,20 +26,33 @@ import redis.clients.jedis.JedisPool; import java.io.Closeable; import java.io.IOException; +/** + * Container for managing Jedis instances. + */ public class JedisContainer implements JedisCommandsInstanceContainer, Closeable { private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class); private JedisPool jedisPool; + /** + * Constructor + * @param jedisPool JedisPool which actually manages Jedis instances + */ public JedisContainer(JedisPool jedisPool) { this.jedisPool = jedisPool; } + /** + * {@inheritDoc} + */ @Override public JedisCommands getInstance() { return jedisPool.getResource(); } + /** + * {@inheritDoc} + */ @Override public void returnInstance(JedisCommands jedisCommands) { if (jedisCommands == null) { @@ -53,6 +66,9 @@ public class JedisContainer implements JedisCommandsInstanceContainer, Closeable } } + /** + * {@inheritDoc} + */ @Override public void close() { jedisPool.close(); http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java index ba981d0..ede1868 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java @@ -19,16 +19,28 @@ package org.apache.storm.redis.common.mapper; import java.io.Serializable; +/** + * RedisDataTypeDescription defines data type and additional key if needed for lookup / store tuples. + */ public class RedisDataTypeDescription implements Serializable { public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG } private RedisDataType dataType; private String additionalKey; + /** + * Constructor + * @param dataType data type + */ public RedisDataTypeDescription(RedisDataType dataType) { this(dataType, null); } + /** + * Constructor + * @param dataType data type + * @param additionalKey additional key for hash and sorted set + */ public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) { this.dataType = dataType; this.additionalKey = additionalKey; @@ -40,10 +52,18 @@ public class RedisDataTypeDescription implements Serializable { } } + /** + * Returns defined data type. + * @return data type + */ public RedisDataType getDataType() { return dataType; } + /** + * Returns defined additional key. + * @return additional key + */ public String getAdditionalKey() { return additionalKey; } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java index 880aea1..727e4ec 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java @@ -23,6 +23,9 @@ import backtype.storm.tuple.Values; import java.util.List; +/** + * RedisStoreMapper is for defining spec. which is used for querying value from Redis and converting response to tuple. + */ public interface RedisLookupMapper extends TupleMapper, RedisMapper { /** * Converts return value from Redis to a list of storm values that can be emitted. @@ -30,11 +33,11 @@ public interface RedisLookupMapper extends TupleMapper, RedisMapper { * @param value Redis query response value. Can be String, Boolean, Long regarding of data type. * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple. */ - public List<Values> toTuple(ITuple input, Object value); + List<Values> toTuple(ITuple input, Object value); /** * declare what are the fields that this code will output. - * @param declarer + * @param declarer OutputFieldsDeclarer */ void declareOutputFields(OutputFieldsDeclarer declarer); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java index 416ce5f..b6a9559 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java @@ -17,6 +17,13 @@ */ package org.apache.storm.redis.common.mapper; +/** + * RedisMapper is for defining data type for querying / storing from / to Redis. + */ public interface RedisMapper { - public RedisDataTypeDescription getDataTypeDescription(); + /** + * Returns descriptor which defines data type. + * @return data type descriptor + */ + RedisDataTypeDescription getDataTypeDescription(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java index b3d7adf..4ab8b1f 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java @@ -17,5 +17,8 @@ */ package org.apache.storm.redis.common.mapper; +/** + * RedisStoreMapper is for defining spec. which is used for storing value to Redis. + */ public interface RedisStoreMapper extends TupleMapper, RedisMapper { } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java index 86664b8..bcc531e 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java @@ -21,7 +21,21 @@ import backtype.storm.tuple.ITuple; import java.io.Serializable; +/** + * TupleMapper defines how to extract key and value from tuple for Redis. + */ public interface TupleMapper extends Serializable { - public String getKeyFromTuple(ITuple tuple); - public String getValueFromTuple(ITuple tuple); + /** + * Extracts key from tuple. + * @param tuple source tuple + * @return key + */ + String getKeyFromTuple(ITuple tuple); + + /** + * Extracts value from tuple. + * @param tuple source tuple + * @return value + */ + String getValueFromTuple(ITuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java index b6fc8d7..26056d2 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java @@ -33,6 +33,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * AbstractRedisMapState is base class of any RedisMapState, which implements IBackingMap. + * <p/> + * Derived classes should provide<br/> + * - which Serializer it uses<br/> + * - which KeyFactory it uses<br/> + * - how to retrieve values from Redis<br/> + * - how to store values to Redis<br/> + * and AbstractRedisMapState takes care of rest things. + * + * @param <T> value's type class + */ public abstract class AbstractRedisMapState<T> implements IBackingMap<T> { public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of( StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(), @@ -40,6 +52,9 @@ public abstract class AbstractRedisMapState<T> implements IBackingMap<T> { StateType.OPAQUE, new JSONOpaqueSerializer() )); + /** + * {@inheritDoc} + */ @Override public List<T> multiGet(List<List<Object>> keys) { if (keys.size() == 0) { return Collections.emptyList(); @@ -51,6 +66,9 @@ public abstract class AbstractRedisMapState<T> implements IBackingMap<T> { return deserializeValues(keys, values); } + /** + * {@inheritDoc} + */ @Override public void multiPut(List<List<Object>> keys, List<T> vals) { if (keys.size() == 0) { @@ -89,8 +107,31 @@ public abstract class AbstractRedisMapState<T> implements IBackingMap<T> { return result; } + /** + * Returns Serializer which is used for serializing tuple value and deserializing Redis value. + * + * @return serializer + */ protected abstract Serializer getSerializer(); + + /** + * Returns KeyFactory which is used for converting state key -> Redis key. + * @return key factory + */ protected abstract KeyFactory getKeyFactory(); + + /** + * Retrieves values from Redis that each value is corresponding to each key. + * + * @param keys keys having state values + * @return values which are corresponding to keys + */ protected abstract List<String> retrieveValuesFromRedis(List<String> keys); + + /** + * Updates (key, value) pairs to Redis. + * + * @param keyValues (key, value) pairs + */ protected abstract void updateStatesToRedis(Map<String, String> keyValues); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java index 24ecfc4..5c7335d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java @@ -28,11 +28,24 @@ import storm.trident.tuple.TridentTuple; import java.util.List; +/** + * AbstractRedisStateQuerier is base class of any RedisStateQuerier, which implements BaseQueryFunction. + * <p/> + * Derived classes should provide how to retrieve values from Redis, + * and AbstractRedisStateQuerier takes care of rest things. + * + * @param <T> type of State + */ public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQueryFunction<T, List<Values>> { private final RedisLookupMapper lookupMapper; protected final RedisDataTypeDescription.RedisDataType dataType; protected final String additionalKey; + /** + * Constructor + * + * @param lookupMapper mapper for querying + */ public AbstractRedisStateQuerier(RedisLookupMapper lookupMapper) { this.lookupMapper = lookupMapper; @@ -41,6 +54,9 @@ public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQue this.additionalKey = dataTypeDescription.getAdditionalKey(); } + /** + * {@inheritDoc} + */ @Override public List<List<Values>> batchRetrieve(T state, List<TridentTuple> inputs) { List<List<Values>> values = Lists.newArrayList(); @@ -58,6 +74,9 @@ public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQue return values; } + /** + * {@inheritDoc} + */ @Override public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) { for (Values value : values) { @@ -65,5 +84,12 @@ public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQue } } - protected abstract List<String> retrieveValuesFromRedis(T redisClusterState, List<String> keys); + /** + * Retrieves values from Redis that each value is corresponding to each key. + * + * @param state State for handling query + * @param keys keys having state values + * @return values which are corresponding to keys + */ + protected abstract List<String> retrieveValuesFromRedis(T state, List<String> keys); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java index 87bb8fa..e9654c7 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java @@ -28,6 +28,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * AbstractRedisStateUpdater is base class of any RedisStateUpdater, which implements BaseStateUpdater. + * <p/> + * Derived classes should provide how to update (key, value) pairs to Redis, + * and AbstractRedisStateUpdater takes care of rest things. + * + * @param <T> type of State + */ public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> { private final RedisStoreMapper storeMapper; @@ -35,6 +43,11 @@ public abstract class AbstractRedisStateUpdater<T extends State> extends BaseSta protected final RedisDataTypeDescription.RedisDataType dataType; protected final String additionalKey; + /** + * Constructor + * + * @param storeMapper mapper for storing + */ public AbstractRedisStateUpdater(RedisStoreMapper storeMapper) { this.storeMapper = storeMapper; RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); @@ -42,6 +55,11 @@ public abstract class AbstractRedisStateUpdater<T extends State> extends BaseSta this.additionalKey = dataTypeDescription.getAdditionalKey(); } + /** + * Sets expire (time to live) if needed + * + * @param expireIntervalSec time to live in seconds + */ public void setExpireInterval(int expireIntervalSec) { if (expireIntervalSec > 0) { this.expireIntervalSec = expireIntervalSec; @@ -50,6 +68,9 @@ public abstract class AbstractRedisStateUpdater<T extends State> extends BaseSta } } + /** + * {@inheritDoc} + */ @Override public void updateState(T state, List<TridentTuple> inputs, TridentCollector collector) { @@ -65,5 +86,11 @@ public abstract class AbstractRedisStateUpdater<T extends State> extends BaseSta updateStatesToRedis(state, keyToValue); } + /** + * Updates (key, value) pairs to Redis. + * + * @param state State for handling query + * @param keyToValue (key, value) pairs + */ protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java index 7acea10..4898dcc 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java @@ -20,10 +20,32 @@ package org.apache.storm.redis.trident.state; import java.io.Serializable; import java.util.List; +/** + * KeyFactory defines conversion of state key (which could be compounded) -> Redis key. + */ public interface KeyFactory extends Serializable { + /** + * Converts state key to Redis key. + * @param key state key + * @return Redis key + */ String build(List<Object> key); + /** + * Default Key Factory + */ class DefaultKeyFactory implements KeyFactory { + /** + * {@inheritDoc} + * <p/> + * Currently DefaultKeyFactory returns just first element of list. + * + * @param key state key + * @return Redis key + * @throws RuntimeException when key is compound key + * @see KeyFactory#build(List) + */ + @Override public String build(List<Object> key) { if (key.size() != 1) throw new RuntimeException("Default KeyFactory does not support compound keys"); http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java index e1bf5fb..f4dbfaa 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java @@ -22,6 +22,12 @@ import storm.trident.state.Serializer; import java.io.Serializable; +/** + * Options of State.<br/> + * It's a data structure (whole things are public) and you can access and modify all fields. + * + * @param <T> value's type class + */ public class Options<T> implements Serializable { private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java index cd871eb..cbd37c5 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java @@ -39,76 +39,157 @@ import storm.trident.state.map.TransactionalMap; import java.util.List; import java.util.Map; +/** + * IBackingMap implementation for Redis Cluster environment. + * + * @param <T> value's type class + * @see AbstractRedisMapState + */ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { /** - * OpaqueTransactional for redis-cluster. - * */ + * Provides StateFactory for opaque transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return StateFactory + */ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig) { return opaque(jedisClusterConfig, new Options()); } + /** + * Provides StateFactory for opaque transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param dataTypeDescription definition of data type + * @return StateFactory + */ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) { Options opts = new Options(); opts.dataTypeDescription = dataTypeDescription; return opaque(jedisClusterConfig, opts); } + /** + * Provides StateFactory for opaque transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param factory key factory + * @return StateFactory + */ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, KeyFactory factory) { Options opts = new Options(); opts.keyFactory = factory; return opaque(jedisClusterConfig, opts); } + /** + * Provides StateFactory for opaque transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param opts options of State + * @return StateFactory + */ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, Options<OpaqueValue> opts) { return new Factory(jedisClusterConfig, StateType.OPAQUE, opts); } /** - * Transactional for redis-cluster. - * */ + * Provides StateFactory for transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return StateFactory + */ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig) { return transactional(jedisClusterConfig, new Options()); } + /** + * Provides StateFactory for transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param dataTypeDescription definition of data type + * @return StateFactory + */ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) { Options opts = new Options(); opts.dataTypeDescription = dataTypeDescription; return transactional(jedisClusterConfig, opts); } + /** + * Provides StateFactory for transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param factory key factory + * @return StateFactory + */ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) { Options opts = new Options(); opts.keyFactory = factory; return transactional(jedisClusterConfig, opts); } + /** + * Provides StateFactory for transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param opts options of State + * @return StateFactory + */ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, Options<TransactionalValue> opts) { return new Factory(jedisClusterConfig, StateType.TRANSACTIONAL, opts); } /** - * NonTransactional for redis-cluster. - * */ + * Provides StateFactory for non transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return StateFactory + */ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig) { return nonTransactional(jedisClusterConfig, new Options()); } + /** + * Provides StateFactory for non transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param dataTypeDescription definition of data type + * @return StateFactory + */ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) { Options opts = new Options(); opts.dataTypeDescription = dataTypeDescription; return nonTransactional(jedisClusterConfig, opts); } + /** + * Provides StateFactory for non transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param factory key factory + * @return StateFactory + */ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) { Options opts = new Options(); opts.keyFactory = factory; return nonTransactional(jedisClusterConfig, opts); } + /** + * Provides StateFactory for non transactional. + * + * @param jedisClusterConfig configuration for JedisCluster + * @param opts options of State + * @return StateFactory + */ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, Options<Object> opts) { return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts); } + /** + * RedisClusterMapState.Factory provides Redis Cluster environment version of StateFactory. + */ protected static class Factory implements StateFactory { public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); @@ -119,6 +200,13 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { KeyFactory keyFactory; Options options; + /** + * Constructor + * + * @param jedisClusterConfig configuration for JedisCluster + * @param type StateType + * @param options options of State + */ public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) { this.jedisClusterConfig = jedisClusterConfig; this.type = type; @@ -137,6 +225,10 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { } } + /** + * {@inheritDoc} + */ + @Override public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getTimeout(), @@ -169,6 +261,14 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { private Serializer serializer; private KeyFactory keyFactory; + /** + * Constructor + * + * @param jedisCluster JedisCluster + * @param options options of State + * @param serializer Serializer + * @param keyFactory KeyFactory + */ public RedisClusterMapState(JedisCluster jedisCluster, Options options, Serializer<T> serializer, KeyFactory keyFactory) { this.jedisCluster = jedisCluster; @@ -177,16 +277,25 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { this.keyFactory = keyFactory; } + /** + * {@inheritDoc} + */ @Override protected Serializer getSerializer() { return serializer; } + /** + * {@inheritDoc} + */ @Override protected KeyFactory getKeyFactory() { return keyFactory; } + /** + * {@inheritDoc} + */ @Override protected List<String> retrieveValuesFromRedis(List<String> keys) { String[] stringKeys = keys.toArray(new String[keys.size()]); @@ -210,6 +319,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { } } + /** + * {@inheritDoc} + */ @Override protected void updateStatesToRedis(Map<String, String> keyValues) { RedisDataTypeDescription description = this.options.dataTypeDescription; http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java index d74e838..764436d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java @@ -27,26 +27,47 @@ import storm.trident.state.StateFactory; import java.util.Map; +/** + * Implementation of State for Redis Cluster environment. + */ public class RedisClusterState implements State { - private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class); - + /** + * {@inheritDoc} + */ @Override public void beginCommit(Long aLong) { } + /** + * {@inheritDoc} + */ @Override public void commit(Long aLong) { } + /** + * RedisClusterState.Factory implements StateFactory for Redis Cluster environment. + * + * @see StateFactory + */ public static class Factory implements StateFactory { public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); private JedisClusterConfig jedisClusterConfig; + /** + * Constructor + * + * @param config configuration of JedisCluster + */ public Factory(JedisClusterConfig config) { this.jedisClusterConfig = config; } + /** + * {@inheritDoc} + */ + @Override public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getTimeout(), @@ -59,20 +80,31 @@ public class RedisClusterState implements State { private JedisCluster jedisCluster; + /** + * Constructor + * + * @param jedisCluster JedisCluster + */ public RedisClusterState(JedisCluster jedisCluster) { this.jedisCluster = jedisCluster; } /** - * The state updater and querier can get a JedisCluster instance - * */ + * Borrows JedisCluster instance. + * <p/> + * Note that you should return borrowed instance when you finish using instance. + * + * @return JedisCluster instance + */ public JedisCluster getJedisCluster() { return this.jedisCluster; } /** - * The state updater and querier return the JedisCluster instance - * */ + * Returns JedisCluster instance to pool. + * + * @param jedisCluster JedisCluster instance to return to pool + */ public void returnJedisCluster(JedisCluster jedisCluster) { //do nothing } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java index 66ff3f6..b692bd8 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java @@ -23,16 +23,29 @@ import redis.clients.jedis.JedisCluster; import java.util.ArrayList; import java.util.List; +/** + * BaseQueryFunction implementation for Redis Cluster environment. + * + * @see AbstractRedisStateQuerier + */ public class RedisClusterStateQuerier extends AbstractRedisStateQuerier<RedisClusterState> { + /** + * Constructor + * + * @param lookupMapper mapper for querying + */ public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) { super(lookupMapper); } + /** + * {@inheritDoc} + */ @Override - protected List<String> retrieveValuesFromRedis(RedisClusterState redisClusterState, List<String> keys) { + protected List<String> retrieveValuesFromRedis(RedisClusterState state, List<String> keys) { JedisCluster jedisCluster = null; try { - jedisCluster = redisClusterState.getJedisCluster(); + jedisCluster = state.getJedisCluster(); List<String> redisVals = new ArrayList<String>(); for (String key : keys) { @@ -51,7 +64,7 @@ public class RedisClusterStateQuerier extends AbstractRedisStateQuerier<RedisClu return redisVals; } finally { if (jedisCluster != null) { - redisClusterState.returnJedisCluster(jedisCluster); + state.returnJedisCluster(jedisCluster); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java index 17c5bfc..1cc0725 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java @@ -23,16 +23,35 @@ import redis.clients.jedis.JedisCluster; import java.util.Map; +/** + * BaseStateUpdater implementation for Redis Cluster environment. + * + * @see AbstractRedisStateUpdater + */ public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> { + /** + * Constructor + * + * @param storeMapper mapper for storing + */ public RedisClusterStateUpdater(RedisStoreMapper storeMapper) { super(storeMapper); } + /** + * Sets expire (time to live) if needed. + * + * @param expireIntervalSec time to live in seconds + * @return RedisClusterStateUpdater itself + */ public RedisClusterStateUpdater withExpire(int expireIntervalSec) { setExpireInterval(expireIntervalSec); return this; } + /** + * {@inheritDoc} + */ @Override protected void updateStatesToRedis(RedisClusterState redisClusterState, Map<String, String> keyToValue) { JedisCluster jedisCluster = null; http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java index f0ab941..25e9924 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java @@ -40,76 +40,157 @@ import storm.trident.state.map.TransactionalMap; import java.util.List; import java.util.Map; +/** + * IBackingMap implementation for single Redis environment. + * + * @param <T> value's type class + * @see AbstractRedisMapState + */ public class RedisMapState<T> extends AbstractRedisMapState<T> { /** - * OpaqueTransactional for redis. - * */ + * Provides StateFactory for opaque transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @return StateFactory + */ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig) { return opaque(jedisPoolConfig, new Options()); } + /** + * Provides StateFactory for opaque transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param dataTypeDescription definition of data type + * @return StateFactory + */ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) { Options opts = new Options(); opts.dataTypeDescription = dataTypeDescription; return opaque(jedisPoolConfig, opts); } - public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, KeyFactory factory) { + /** + * Provides StateFactory for opaque transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param factory key factory + * @return StateFactory + */ + public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, KeyFactory factory) { Options opts = new Options(); opts.keyFactory = factory; return opaque(jedisPoolConfig, opts); } + /** + * Provides StateFactory for opaque transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param opts options of State + * @return StateFactory + */ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, Options<OpaqueValue> opts) { return new Factory(jedisPoolConfig, StateType.OPAQUE, opts); } /** - * Transactional for redis. - * */ + * Provides StateFactory for transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @return StateFactory + */ public static StateFactory transactional(JedisPoolConfig jedisPoolConfig) { return transactional(jedisPoolConfig, new Options()); } + /** + * Provides StateFactory for transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param dataTypeDescription definition of data type + * @return StateFactory + */ public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) { Options opts = new Options(); opts.dataTypeDescription = dataTypeDescription; return transactional(jedisPoolConfig, opts); } + /** + * Provides StateFactory for transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param factory key factory + * @return StateFactory + */ public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, KeyFactory factory) { Options opts = new Options(); opts.keyFactory = factory; return transactional(jedisPoolConfig, opts); } + /** + * Provides StateFactory for transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param opts options of State + * @return StateFactory + */ public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, Options<TransactionalValue> opts) { return new Factory(jedisPoolConfig, StateType.TRANSACTIONAL, opts); } /** - * NonTransactional for redis. - * */ + * Provides StateFactory for non transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @return StateFactory + */ public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig) { return nonTransactional(jedisPoolConfig, new Options()); } + /** + * Provides StateFactory for non transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param dataTypeDescription definition of data type + * @return StateFactory + */ public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) { Options opts = new Options(); opts.dataTypeDescription = dataTypeDescription; return nonTransactional(jedisPoolConfig, opts); } + /** + * Provides StateFactory for non transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param factory key factory + * @return StateFactory + */ public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, KeyFactory factory) { Options opts = new Options(); opts.keyFactory = factory; return nonTransactional(jedisPoolConfig, opts); } + /** + * Provides StateFactory for non transactional. + * + * @param jedisPoolConfig configuration for JedisPool + * @param opts options of State + * @return StateFactory + */ public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, Options<Object> opts) { return new Factory(jedisPoolConfig, StateType.NON_TRANSACTIONAL, opts); } + /** + * RedisMapState.Factory provides single Redis environment version of StateFactory. + */ protected static class Factory implements StateFactory { public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); @@ -120,6 +201,13 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { KeyFactory keyFactory; Options options; + /** + * Constructor + * + * @param jedisPoolConfig configuration for JedisPool + * @param type StateType + * @param options options of State + */ public Factory(JedisPoolConfig jedisPoolConfig, StateType type, Options options) { this.jedisPoolConfig = jedisPoolConfig; this.type = type; @@ -138,6 +226,10 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { } } + /** + * {@inheritDoc} + */ + @Override public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, jedisPoolConfig.getHost(), @@ -171,6 +263,14 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { private Serializer serializer; private KeyFactory keyFactory; + /** + * Constructor + * + * @param jedisPool JedisPool + * @param options options of State + * @param serializer Serializer + * @param keyFactory KeyFactory + */ public RedisMapState(JedisPool jedisPool, Options options, Serializer<T> serializer, KeyFactory keyFactory) { this.jedisPool = jedisPool; @@ -179,16 +279,25 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { this.keyFactory = keyFactory; } + /** + * {@inheritDoc} + */ @Override protected Serializer getSerializer() { return serializer; } + /** + * {@inheritDoc} + */ @Override protected KeyFactory getKeyFactory() { return keyFactory; } + /** + * {@inheritDoc} + */ @Override protected List<String> retrieveValuesFromRedis(List<String> keys) { String[] stringKeys = keys.toArray(new String[keys.size()]); @@ -216,6 +325,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { } } + /** + * {@inheritDoc} + */ @Override protected void updateStatesToRedis(Map<String, String> keyValues) { Jedis jedis = null; http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java index 3441936..85d0e1b 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java @@ -28,26 +28,47 @@ import storm.trident.state.StateFactory; import java.util.Map; +/** + * Implementation of State for single Redis environment. + */ public class RedisState implements State { - private static final Logger logger = LoggerFactory.getLogger(RedisState.class); - + /** + * {@inheritDoc} + */ @Override public void beginCommit(Long aLong) { } + /** + * {@inheritDoc} + */ @Override public void commit(Long aLong) { } + /** + * RedisState.Factory implements StateFactory for single Redis environment. + * + * @see StateFactory + */ public static class Factory implements StateFactory { public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); private JedisPoolConfig jedisPoolConfig; + /** + * Constructor + * + * @param config configuration of JedisPool + */ public Factory(JedisPoolConfig config) { this.jedisPoolConfig = config; } + /** + * {@inheritDoc} + */ + @Override public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, jedisPoolConfig.getHost(), @@ -62,20 +83,31 @@ public class RedisState implements State { private JedisPool jedisPool; + /** + * Constructor + * + * @param jedisPool JedisPool + */ public RedisState(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** - * The state updater and querier can get a Jedis instance - * */ + * Borrows Jedis instance from pool. + * <p/> + * Note that you should return borrowed instance to pool when you finish using instance. + * + * @return Jedis instance + */ public Jedis getJedis() { return this.jedisPool.getResource(); } /** - * The state updater and querier return the Jedis instance - * */ + * Returns Jedis instance to pool. + * + * @param jedis Jedis instance to return to pool + */ public void returnJedis(Jedis jedis) { jedis.close(); } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java index ac102dd..f924e85 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java @@ -22,16 +22,29 @@ import redis.clients.jedis.Jedis; import java.util.List; +/** + * BaseQueryFunction implementation for single Redis environment. + * + * @see AbstractRedisStateQuerier + */ public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> { + /** + * Constructor + * + * @param lookupMapper mapper for querying + */ public RedisStateQuerier(RedisLookupMapper lookupMapper) { super(lookupMapper); } + /** + * {@inheritDoc} + */ @Override - protected List<String> retrieveValuesFromRedis(RedisState redisState, List<String> keys) { + protected List<String> retrieveValuesFromRedis(RedisState state, List<String> keys) { Jedis jedis = null; try { - jedis = redisState.getJedis(); + jedis = state.getJedis(); List<String> redisVals; String[] keysForRedis = keys.toArray(new String[keys.size()]); @@ -49,7 +62,7 @@ public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> { return redisVals; } finally { if (jedis != null) { - redisState.returnJedis(jedis); + state.returnJedis(jedis); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/0b4844fd/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java index babcb1d..d0507cf 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java @@ -24,16 +24,35 @@ import redis.clients.jedis.Pipeline; import java.util.Map; +/** + * BaseStateUpdater implementation for single Redis environment. + * + * @see AbstractRedisStateUpdater + */ public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> { + /** + * Constructor + * + * @param storeMapper mapper for storing + */ public RedisStateUpdater(RedisStoreMapper storeMapper) { super(storeMapper); } + /** + * Sets expire (time to live) if needed. + * + * @param expireIntervalSec time to live in seconds + * @return RedisStateUpdater itself + */ public RedisStateUpdater withExpire(int expireIntervalSec) { setExpireInterval(expireIntervalSec); return this; } + /** + * {@inheritDoc} + */ @Override protected void updateStatesToRedis(RedisState redisState, Map<String, String> keyToValue) { Jedis jedis = null;