[FLINK-3034] Redis Sink Connector This closes #1813
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab9e36c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab9e36c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab9e36c Branch: refs/heads/master Commit: 3ab9e36c76395a9e900ca6b87746a013ee57da32 Parents: cb78245 Author: subhankar <subhankar.bis...@target.com> Authored: Thu Mar 17 17:40:07 2016 +0530 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu Jul 7 09:58:17 2016 +0200 ---------------------------------------------------------------------- docs/apis/streaming/connectors/index.md | 1 + docs/apis/streaming/connectors/redis.md | 177 +++++++++++++ docs/apis/streaming/fault_tolerance.md | 5 + .../flink-connector-redis/pom.xml | 79 ++++++ .../streaming/connectors/redis/RedisSink.java | 182 +++++++++++++ .../common/config/FlinkJedisClusterConfig.java | 187 +++++++++++++ .../common/config/FlinkJedisConfigBase.java | 90 +++++++ .../common/config/FlinkJedisPoolConfig.java | 224 ++++++++++++++++ .../common/config/FlinkJedisSentinelConfig.java | 259 +++++++++++++++++++ .../common/container/RedisClusterContainer.java | 161 ++++++++++++ .../container/RedisCommandsContainer.java | 108 ++++++++ .../RedisCommandsContainerBuilder.java | 116 +++++++++ .../redis/common/container/RedisContainer.java | 243 +++++++++++++++++ .../redis/common/mapper/RedisCommand.java | 86 ++++++ .../common/mapper/RedisCommandDescription.java | 94 +++++++ .../redis/common/mapper/RedisDataType.java | 66 +++++ .../redis/common/mapper/RedisMapper.java | 66 +++++ .../connectors/redis/RedisITCaseBase.java | 45 ++++ .../redis/RedisSentinelClusterTest.java | 100 +++++++ .../connectors/redis/RedisSinkITCase.java | 235 +++++++++++++++++ .../redis/RedisSinkPublishITCase.java | 137 ++++++++++ .../connectors/redis/RedisSinkTest.java | 65 +++++ .../common/config/FlinkJedisConfigBaseTest.java | 50 ++++ .../common/config/JedisClusterConfigTest.java | 49 ++++ .../common/config/JedisPoolConfigTest.java | 29 +++ .../common/config/JedisSentinelConfigTest.java | 49 ++++ .../mapper/RedisDataTypeDescriptionTest.java | 41 +++ flink-streaming-connectors/pom.xml | 1 + 28 files changed, 2945 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/docs/apis/streaming/connectors/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md index 87182b7..83ca514 100644 --- a/docs/apis/streaming/connectors/index.md +++ b/docs/apis/streaming/connectors/index.md @@ -39,6 +39,7 @@ Currently these systems are supported: * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source) * [Apache NiFi](https://nifi.apache.org) (sink/source) * [Apache Cassandra](https://cassandra.apache.org/) (sink) + * [Redis](http://redis.io/) (sink) To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/docs/apis/streaming/connectors/redis.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/redis.md b/docs/apis/streaming/connectors/redis.md new file mode 100644 index 0000000..dfa5296 --- /dev/null +++ b/docs/apis/streaming/connectors/redis.md @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} +Version Compatibility: This module is compatible with Redis 2.8.5. + +Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +#### Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + +#### Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{ + + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } +} +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream<String> stream = ...; +stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getCommandDescription: RedisCommandDescription = { + new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} +</div> +</div> + +This example code does the same, but for Redis Cluster: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() + .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); + +DataStream<String> stream = ...; +stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} +</div> +</div> + +This example shows when the Redis environment is with Sentinels: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() + .setMasterName("master").setSentinels(...).build(); + +DataStream<String> stream = ...; +stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} +</div> +</div> + +This section gives a description of all the available data types and what Redis command used for that. + +<table class="table table-bordered" style="width: 75%"> + <thead> + <tr> + <th class="text-center" style="width: 20%">Data Type</th> + <th class="text-center" style="width: 25%">Redis Command [Sink]</th> + <th class="text-center" style="width: 25%">Redis Command [Source]</th> + </tr> + </thead> + <tbody> + <tr> + <td>HASH</td><td><a href="http://redis.io/commands/hset">HSET</a></td><td>--NA--</td> + </tr> + <tr> + <td>LIST</td><td> + <a href="http://redis.io/commands/rpush">RPUSH</a>, + <a href="http://redis.io/commands/lpush">LPUSH</a> + </td><td>--NA--</td> + </tr> + <tr> + <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td><td>--NA--</td> + </tr> + <tr> + <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td><td>--NA--</td> + </tr> + <tr> + <td>STRING</td><td><a href="http://redis.io/commands/set">SET</a></td><td>--NA--</td> + </tr> + <tr> + <td>HYPER_LOG_LOG</td><td><a href="http://redis.io/commands/pfadd">PFADD</a></td><td>--NA--</td> + </tr> + <tr> + <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td><td>--NA--</td> + </tr> + </tbody> +</table> +More about Redis can be found [here](http://redis.io/). http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/docs/apis/streaming/fault_tolerance.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md index 0e98632..89c2557 100644 --- a/docs/apis/streaming/fault_tolerance.md +++ b/docs/apis/streaming/fault_tolerance.md @@ -206,6 +206,11 @@ state updates) of Flink coupled with bundled sinks: <td>at least once</td> <td></td> </tr> + <tr> + <td>Redis sink</td> + <td>at least once</td> + <td></td> + </tr> </tbody> </table> http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/pom.xml b/flink-streaming-connectors/flink-connector-redis/pom.xml new file mode 100644 index 0000000..6c8f609 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-connectors</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-redis_2.10</artifactId> + <name>flink-connector-redis</name> + + <packaging>jar</packaging> + + <properties> + <jedis.version>2.8.0</jedis.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <version>${jedis.version}</version> + </dependency> + + <dependency> + <groupId>com.github.kstyrc</groupId> + <artifactId>embedded-redis</artifactId> + <version>0.6</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java new file mode 100644 index 0000000..43518e8 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * <p>Example: + * + * <pre> + *{@code + *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> { + * + * private RedisCommand redisCommand; + * + * public RedisExampleMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2<String, String> data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2<String, String> data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH)); + *}</pre> + * + * @param <IN> Type of the elements emitted by this sink + */ +public class RedisSink<IN> extends RichSinkFunction<IN> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** + * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. + * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. + * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. + * {@code additionalKey} used as hash name for {@link RedisDataType#HASH} + * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score. + * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET} + */ + private String additionalKey; + private RedisMapper<IN> redisSinkMapper; + private RedisCommand redisCommand; + + private FlinkJedisConfigBase flinkJedisConfigBase; + private RedisCommandsContainer redisCommandsContainer; + + /** + * Creates a new {@link RedisSink} that connects to the Redis server. + * + * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase} + * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements. + */ + public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) { + Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); + Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null"); + Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); + + this.flinkJedisConfigBase = flinkJedisConfigBase; + + this.redisSinkMapper = redisSinkMapper; + RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); + this.redisCommand = redisCommandDescription.getCommand(); + this.additionalKey = redisCommandDescription.getAdditionalKey(); + } + + /** + * Called when new data arrives to the sink, and forwards it to Redis channel. + * Depending on the specified Redis data type (see {@link RedisDataType}), + * a different Redis command will be applied. + * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD. + * + * @param input The incoming data + */ + @Override + public void invoke(IN input) throws Exception { + String key = redisSinkMapper.getKeyFromData(input); + String value = redisSinkMapper.getValueFromData(input); + + switch (redisCommand) { + case RPUSH: + this.redisCommandsContainer.rpush(key, value); + break; + case LPUSH: + this.redisCommandsContainer.lpush(key, value); + break; + case SADD: + this.redisCommandsContainer.sadd(key, value); + break; + case SET: + this.redisCommandsContainer.set(key, value); + break; + case PFADD: + this.redisCommandsContainer.pfadd(key, value); + break; + case PUBLISH: + this.redisCommandsContainer.publish(key, value); + break; + case ZADD: + this.redisCommandsContainer.zadd(this.additionalKey, value, key); + break; + case HSET: + this.redisCommandsContainer.hset(this.additionalKey, key, value); + break; + default: + throw new IllegalArgumentException("Cannot process such data type: " + redisCommand); + } + } + + /** + * Initializes the connection to Redis by either cluster or sentinels or single server. + * + * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null + */ + @Override + public void open(Configuration parameters) throws Exception { + this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + } + + /** + * Closes commands container. + * @throws IOException if command container is unable to close. + */ + @Override + public void close() throws IOException { + if (redisCommandsContainer != null) { + redisCommandsContainer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java new file mode 100644 index 0000000..6e6cfe5 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private final Set<InetSocketAddress> nodes; + private final int maxRedirections; + + + /** + * Jedis cluster configuration. + * The list of node is mandatory, and when nodes is not set, it throws NullPointerException. + * + * @param nodes list of node information for JedisCluster + * @param connectionTimeout socket / connection timeout. The default is 2000 + * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK + * @param maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * @throws NullPointerException if parameter {@code nodes} is {@code null} + */ + private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Preconditions.checkNotNull(nodes, "Node information should be presented"); + Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); + this.nodes = new HashSet<>(nodes); + this.maxRedirections = maxRedirections; + } + + + + /** + * Returns nodes. + * + * @return list of node information + */ + public Set<HostAndPort> getNodes() { + Set<HostAndPort> ret = new HashSet<>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + /** + * Returns limit of redirection. + * + * @return limit of redirection + */ + public int getMaxRedirections() { + return maxRedirections; + } + + + /** + * Builder for initializing {@link FlinkJedisClusterConfig}. + */ + public static class Builder { + private Set<InetSocketAddress> nodes; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int maxRedirections = 5; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets list of node. + * + * @param nodes list of node + * @return Builder itself + */ + public Builder setNodes(Set<InetSocketAddress> nodes) { + this.nodes = nodes; + return this; + } + + /** + * Sets socket / connection timeout. + * + * @param timeout socket / connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets limit of redirection. + * + * @param maxRedirections limit of redirection, default value is 5 + * @return Builder itself + */ + public Builder setMaxRedirections(int maxRedirections) { + this.maxRedirections = maxRedirections; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Builds JedisClusterConfig. + * + * @return JedisClusterConfig + */ + public FlinkJedisClusterConfig build() { + return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisClusterConfig{" + + "nodes=" + nodes + + ", timeout=" + connectionTimeout + + ", maxRedirections=" + maxRedirections + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java new file mode 100644 index 0000000..a2489b8 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { + private static final long serialVersionUID = 1L; + + protected final int maxTotal; + protected final int maxIdle; + protected final int minIdle; + protected final int connectionTimeout; + + protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){ + Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative"); + Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative"); + Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative"); + Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative"); + this.connectionTimeout = connectionTimeout; + this.maxTotal = maxTotal; + this.maxIdle = maxIdle; + this.minIdle = minIdle; + } + + /** + * Returns timeout. + * + * @return connection timeout + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Get the value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code maxTotal} for this + * configuration instance + * @see GenericObjectPoolConfig#getMaxTotal() + */ + public int getMaxTotal() { + return maxTotal; + } + + /** + * Get the value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code maxIdle} for this + * configuration instance + * @see GenericObjectPoolConfig#getMaxIdle() + */ + public int getMaxIdle() { + return maxIdle; + } + + /** + * Get the value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code minIdle} for this + * configuration instance + * @see GenericObjectPoolConfig#getMinIdle() + */ + public int getMinIdle() { + return minIdle; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java new file mode 100644 index 0000000..d261a35 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.Protocol; + +/** + * Configuration for Jedis pool. + */ +public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { + + private static final long serialVersionUID = 1L; + + private final String host; + private final int port; + private final int database; + private final String password; + + + /** + * Jedis pool configuration. + * The host is mandatory, and when host is not set, it throws NullPointerException. + * + * @param host hostname or IP + * @param port port, default value is 6379 + * @param connectionTimeout socket / connection timeout, default value is 2000 milli second + * @param password password, if any + * @param database database index + * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @throws NullPointerException if parameter {@code host} is {@code null} + */ + private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Preconditions.checkNotNull(host, "Host information should be presented"); + this.host = host; + this.port = port; + this.database = database; + this.password = password; + } + + /** + * Returns host. + * + * @return hostname or IP + */ + public String getHost() { + return host; + } + + /** + * Returns port. + * + * @return port + */ + public int getPort() { + return port; + } + + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Builder for initializing {@link FlinkJedisPoolConfig}. + */ + public static class Builder { + private String host; + private int port = Protocol.DEFAULT_PORT; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int database = Protocol.DEFAULT_DATABASE; + private String password; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Sets host. + * + * @param host host + * @return Builder itself + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Sets port. + * + * @param port port, default value is 6379 + * @return Builder itself + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** + * Sets timeout. + * + * @param timeout timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + + /** + * Builds JedisPoolConfig. + * + * @return JedisPoolConfig + */ + public FlinkJedisPoolConfig build() { + return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisPoolConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", timeout=" + connectionTimeout + + ", database=" + database + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java new file mode 100644 index 0000000..2cdb397 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Sentinel pool. + */ +public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class); + + private final String masterName; + private final Set<String> sentinels; + private final int soTimeout; + private final String password; + private final int database; + + /** + * Jedis Sentinels config. + * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException. + * + * @param masterName master name of the replica set + * @param sentinels set of sentinel hosts + * @param connectionTimeout timeout connection timeout + * @param soTimeout timeout socket timeout + * @param password password, if any + * @param database database database index + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * + * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null} + * @throws IllegalArgumentException if {@code sentinels} are empty + */ + private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels, + int connectionTimeout, int soTimeout, + String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Preconditions.checkNotNull(masterName, "Master name should be presented"); + Preconditions.checkNotNull(sentinels, "Sentinels information should be presented"); + Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty"); + + this.masterName = masterName; + this.sentinels = new HashSet<>(sentinels); + this.soTimeout = soTimeout; + this.password = password; + this.database = database; + } + + /** + * Returns master name of the replica set. + * + * @return master name of the replica set. + */ + public String getMasterName() { + return masterName; + } + + /** + * Returns Sentinels host addresses. + * + * @return Set of Sentinels host addresses + */ + public Set<String> getSentinels() { + return sentinels; + } + + /** + * Returns socket timeout. + * + * @return socket timeout + */ + public int getSoTimeout() { + return soTimeout; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** + * Builder for initializing {@link FlinkJedisSentinelConfig}. + */ + public static class Builder { + private String masterName; + private Set<String> sentinels; + private int connectionTimeout = Protocol.DEFAULT_TIMEOUT; + private int soTimeout = Protocol.DEFAULT_TIMEOUT; + private String password; + private int database = Protocol.DEFAULT_DATABASE; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets master name of the replica set. + * + * @param masterName master name of the replica set + * @return Builder itself + */ + public Builder setMasterName(String masterName) { + this.masterName = masterName; + return this; + } + + /** + * Sets sentinels address. + * + * @param sentinels host set of the sentinels + * @return Builder itself + */ + public Builder setSentinels(Set<String> sentinels) { + this.sentinels = sentinels; + return this; + } + + /** + * Sets connection timeout. + * + * @param connectionTimeout connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Sets socket timeout. + * + * @param soTimeout socket timeout, default value is 2000 + * @return Builder itself + */ + public Builder setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Builds JedisSentinelConfig. + * + * @return JedisSentinelConfig + */ + public FlinkJedisSentinelConfig build(){ + return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout, + password, database, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisSentinelConfig{" + + "masterName='" + masterName + '\'' + + ", connectionTimeout=" + connectionTimeout + + ", soTimeout=" + soTimeout + + ", database=" + database + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java new file mode 100644 index 0000000..7551c9e --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private JedisCluster jedisCluster; + + /** + * Initialize Redis command container for Redis cluster. + * + * @param jedisCluster JedisCluster instance + */ + public RedisClusterContainer(JedisCluster jedisCluster) { + Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null"); + + this.jedisCluster = jedisCluster; + } + + @Override + public void hset(final String key, final String hashField, final String value) { + try { + jedisCluster.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to hash {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } + } + + @Override + public void rpush(final String listName, final String value) { + try { + jedisCluster.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}", + listName, e.getMessage()); + } + throw e; + } + } + + @Override + public void lpush(String listName, String value) { + try { + jedisCluster.lpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}", + listName, e.getMessage()); + } + throw e; + } + } + + @Override + public void sadd(final String setName, final String value) { + try { + jedisCluster.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + throw e; + } + } + + @Override + public void publish(final String channelName, final String message) { + try { + jedisCluster.publish(channelName, message); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}", + channelName, e.getMessage()); + } + throw e; + } + } + + @Override + public void set(final String key, final String value) { + try { + jedisCluster.set(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SET to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + @Override + public void pfadd(final String key, final String element) { + try { + jedisCluster.set(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PFADD to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + @Override + public void zadd(final String key, final String score, final String element) { + try { + jedisCluster.zadd(key, Double.valueOf(score), element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZADD to set {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + /** + * Closes the {@link JedisCluster}. + */ + @Override + public void close() throws IOException { + this.jedisCluster.close(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java new file mode 100644 index 0000000..1b92c2e --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The container for all available Redis commands. + */ +public interface RedisCommandsContainer extends Serializable { + + /** + * Sets field in the hash stored at key to value. + * If key does not exist, a new key holding a hash is created. + * If field already exists in the hash, it is overwritten. + * + * @param key Hash name + * @param hashField Hash field + * @param value Hash value + */ + void hset(String key, String hashField, String value); + + /** + * Insert the specified value at the tail of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + * + * @param listName Name of the List + * @param value Value to be added + */ + void rpush(String listName, String value); + + /** + * Insert the specified value at the head of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + * + * @param listName Name of the List + * @param value Value to be added + */ + void lpush(String listName, String value); + + /** + * Add the specified member to the set stored at key. + * Specified members that are already a member of this set are ignored. + * If key does not exist, a new set is created before adding the specified members. + * + * @param setName Name of the Set + * @param value Value to be added + */ + void sadd(String setName, String value); + + /** + * Posts a message to the given channel. + * + * @param channelName Name of the channel to which data will be published + * @param message the message + */ + void publish(String channelName, String message); + + /** + * Set key to hold the string value. If key already holds a value, it is overwritten, + * regardless of its type. Any previous time to live associated with the key is + * discarded on successful SET operation. + * + * @param key the key name in which value to be set + * @param value the value + */ + void set(String key, String value); + + /** + * Adds all the element arguments to the HyperLogLog data structure + * stored at the variable name specified as first argument. + * + * @param key The name of the key + * @param element the element + */ + void pfadd(String key, String element); + + /** + * Adds the specified member with the specified scores to the sorted set stored at key. + * + * @param key The name of the Sorted Set + * @param score Score of the element + * @param element element to be added + */ + void zadd(String key, String score, String element); + + /** + * Close the Jedis container. + * + * @throws IOException if the instance can not be closed properly + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java new file mode 100644 index 0000000..dc5396a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +/** + * The builder for {@link RedisCommandsContainer}. + */ +public class RedisCommandsContainerBuilder { + + /** + * Initialize the {@link RedisCommandsContainer} based on the instance type. + * @param flinkJedisConfigBase configuration base + * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null + */ + public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){ + if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){ + FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig); + } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) { + FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig); + } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { + FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig); + } else { + throw new IllegalArgumentException("Jedis configuration not found"); + } + } + + /** + * Builds container for single Redis environment. + * + * @param jedisPoolConfig configuration for JedisPool + * @return container for single Redis environment + * @throws NullPointerException if jedisPoolConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { + Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** + * Builds container for Redis Cluster environment. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return container for Redis Cluster environment + * @throws NullPointerException if jedisClusterConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) { + Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle()); + + JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), + jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig); + return new RedisClusterContainer(jedisCluster); + } + + /** + * Builds container for Redis Sentinel environment. + * + * @param jedisSentinelConfig configuration for JedisSentinel + * @return container for Redis sentinel environment + * @throws NullPointerException if jedisSentinelConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { + Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); + + JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, + jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), + jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); + return new RedisContainer(jedisSentinelPool); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java new file mode 100644 index 0000000..8684e9a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Please use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private final JedisPool jedisPool; + private final JedisSentinelPool jedisSentinelPool; + + + /** + * Use this constructor if to connect with single Redis server. + * + * @param jedisPool JedisPool which actually manages Jedis instances + */ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + this.jedisSentinelPool = null; + } + + /** + * Use this constructor if Redis environment is clustered with sentinels. + * + * @param sentinelPool SentinelPool which actually manages Jedis instances + */ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisPool = null; + this.jedisSentinelPool = sentinelPool; + } + + /** + * Closes the Jedis instances. + */ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String key, final String hashField, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void rpush(final String listName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}", + listName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void lpush(String listName, String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.lpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command LUSH to list {} error message {}", + listName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void sadd(final String setName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void publish(final String channelName, final String message) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.publish(channelName, message); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}", + channelName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void set(final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.set(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SET to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void pfadd(final String key, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.pfadd(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PFADD to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void zadd(final String key, final String score, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.zadd(key, Double.valueOf(score), element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZADD to set {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + /** + * Returns Jedis instance from the pool. + * + * @return the Jedis instance + */ + private Jedis getInstance() { + if (jedisSentinelPool != null) { + return jedisSentinelPool.getResource(); + } else { + return jedisPool.getResource(); + } + } + + /** + * Closes the jedis instance after finishing the command. + * + * @param jedis The jedis instance + */ + private void releaseInstance(final Jedis jedis) { + if (jedis == null) { + return; + } + try { + jedis.close(); + } catch (Exception e) { + LOG.error("Failed to close (return) instance to pool", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java new file mode 100644 index 0000000..b0661c7 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available commands for Redis. Each command belongs to a {@link RedisDataType} group. + */ +public enum RedisCommand { + + /** + * Insert the specified value at the head of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operations. + */ + LPUSH(RedisDataType.LIST), + + /** + * Insert the specified value at the tail of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + */ + RPUSH(RedisDataType.LIST), + + /** + * Add the specified member to the set stored at key. + * Specified member that is already a member of this set is ignored. + */ + SADD(RedisDataType.SET), + + /** + * Set key to hold the string value. If key already holds a value, + * it is overwritten, regardless of its type. + */ + SET(RedisDataType.STRING), + + /** + * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument. + */ + PFADD(RedisDataType.HYPER_LOG_LOG), + + /** + * Posts a message to the given channel. + */ + PUBLISH(RedisDataType.PUBSUB), + + /** + * Adds the specified members with the specified score to the sorted set stored at key. + */ + ZADD(RedisDataType.SORTED_SET), + + /** + * Sets field in the hash stored at key to value. If key does not exist, + * a new key holding a hash is created. If field already exists in the hash, it is overwritten. + */ + HSET(RedisDataType.HASH); + + /** + * The {@link RedisDataType} this command belongs to. + */ + private RedisDataType redisDataType; + + RedisCommand(RedisDataType redisDataType) { + this.redisDataType = redisDataType; + } + + + /** + * The {@link RedisDataType} this command belongs to. + * @return the {@link RedisDataType} + */ + public RedisDataType getRedisDataType(){ + return redisDataType; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java new file mode 100644 index 0000000..1eea48a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * The description of the command type. This must be passed while creating new {@link RedisMapper}. + * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}, + * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}. + * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException} + * + * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} + * you can use second constructor {@link #RedisCommandDescription(RedisCommand)} + */ +public class RedisCommandDescription implements Serializable { + + private static final long serialVersionUID = 1L; + + private RedisCommand redisCommand; + + /** + * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. + * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. + * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. + * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH} + * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score. + * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET} + */ + private String additionalKey; + + /** + * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}. + * If different data type is specified, {@code additionalKey} is ignored. + * @param redisCommand the redis command type {@link RedisCommand} + * @param additionalKey additional key for Hash and Sorted set data type + */ + public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) { + Preconditions.checkNotNull(redisCommand, "Redis command type can not be null"); + this.redisCommand = redisCommand; + this.additionalKey = additionalKey; + + if (redisCommand.getRedisDataType() == RedisDataType.HASH || + redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + /** + * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}. + * + * @param redisCommand the redis data type {@link RedisCommand} + */ + public RedisCommandDescription(RedisCommand redisCommand) { + this(redisCommand, null); + } + + /** + * Returns the {@link RedisCommand}. + * + * @return the command type of the mapping + */ + public RedisCommand getCommand() { + return redisCommand; + } + + /** + * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * + * @return the additional key + */ + public String getAdditionalKey() { + return additionalKey; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java new file mode 100644 index 0000000..6e3997c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available data type for Redis. + */ +public enum RedisDataType { + + /** + * Strings are the most basic kind of Redis value. Redis Strings are binary safe, + * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object. + * A String value can be at max 512 Megabytes in length. + */ + STRING, + + /** + * Redis Hashes are maps between string fields and string values. + */ + HASH, + + /** + * Redis Lists are simply lists of strings, sorted by insertion order. + */ + LIST, + + /** + * Redis Sets are an unordered collection of Strings. + */ + SET, + + /** + * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings. + * The difference is that every member of a Sorted Set is associated with score, + * that is used in order to take the sorted set ordered, from the smallest to the greatest score. + * While members are unique, scores may be repeated. + */ + SORTED_SET, + + /** + * HyperLogLog is a probabilistic data structure used in order to count unique things. + */ + HYPER_LOG_LOG, + + /** + * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels, + * without knowledge of what (if any) subscribers there may be. + * Subscribers express interest in one or more channels, and only receive messages + * that are of interest, without knowledge of what (if any) publishers there are. + */ + PUBSUB +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java new file mode 100644 index 0000000..63fed19 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Function that creates the description how the input data should be mapped to redis type. + *<p>Example: + *<pre>{@code + *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> { + * public RedisDataTypeDescription getCommandDescription() { + * return new RedisDataTypeDescription(RedisCommand.PUBLISH); + * } + * public String getKeyFromData(Tuple2<String, String> data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2<String, String> data) { + * return data.f1; + * } + *} + *}</pre> + * + * @param <T> The type of the element handled by this {@code RedisMapper} + */ +public interface RedisMapper<T> extends Function, Serializable { + + /** + * Returns descriptor which defines data type. + * + * @return data type descriptor + */ + RedisCommandDescription getCommandDescription(); + + /** + * Extracts key from data. + * + * @param data source data + * @return key + */ + String getKeyFromData(T data); + + /** + * Extracts value from data. + * + * @param data source data + * @return value + */ + String getValueFromData(T data); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java new file mode 100644 index 0000000..7d98f2d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import redis.embedded.RedisServer; + +import java.io.IOException; + +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase { + + public static final int REDIS_PORT = getAvailablePort(); + public static final String REDIS_HOST = "127.0.0.1"; + + private static RedisServer redisServer; + + @BeforeClass + public static void createRedisServer() throws IOException, InterruptedException { + redisServer = new RedisServer(REDIS_PORT); + redisServer.start(); + } + + @AfterClass + public static void stopRedisServer(){ + redisServer.stop(); + } +}