Repository: incubator-gearpump Updated Branches: refs/heads/master 9b879456d -> 1b9889121
GEARPUMP-215 Gearpump Redis Integration - RedisSink [Gearpump Redis Integration - RedisStorage](https://issues.apache.org/jira/browse/GEARPUMP-215) Author: darionyaphet <[email protected]> Closes #93 from darionyaphet/GEARPUMP-215. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1b988912 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1b988912 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1b988912 Branch: refs/heads/master Commit: 1b9889121c0d776c9423544b76e27292838d1300 Parents: 9b87945 Author: darionyaphet <[email protected]> Authored: Tue Oct 11 06:10:33 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Oct 11 06:10:33 2016 +0800 ---------------------------------------------------------------------- .../apache/gearpump/redis/RedisMessage.scala | 456 +++++++++++++++++++ .../org/apache/gearpump/redis/RedisSink.scala | 119 +++++ project/Build.scala | 12 + 3 files changed, 587 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala new file mode 100644 index 0000000..84dec70 --- /dev/null +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.redis + +import java.nio.charset.Charset + +object RedisMessage { + + private def toBytes(strings: List[String]): List[Array[Byte]] = + strings.map(string => string.getBytes(Charset.forName("UTF8"))) + + private def toBytes(string: String): Array[Byte] = + string.getBytes(Charset.forName("UTF8")) + + object Connection { + + /** + * Change the selected database for the current connection + * + * @param index + */ + case class SELECT(index: Int) + + } + + object Geo { + + /** + * Add one geospatial item in the geospatial index represented using a sorted set + * + * @param key + * @param longitude + * @param latitude + * @param member + */ + case class GEOADD(key: Array[Byte], longitude: Double, + latitude: Double, member: Array[Byte]) { + def this(key: String, longitude: Double, + latitude: Double, member: String) = + this(toBytes(key), longitude, latitude, toBytes(member)) + } + + } + + object Hashes { + + /** + * Delete a hash field + * + * @param key + * @param field + */ + case class HDEL(key: Array[Byte], field: Array[Byte]) { + def this(key: String, field: String) = this(toBytes(key), toBytes(field)) + } + + /** + * Increment the integer value of a hash field by the given number + * + * @param key + * @param field + * @param increment + */ + case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) { + def this(key: String, field: String, increment: Long) = + this(toBytes(key), toBytes(field), increment) + } + + /** + * Increment the float value of a hash field by the given amount + * + * @param key + * @param field + * @param increment + */ + case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) { + def this(key: String, field: String, increment: Float) = + this(toBytes(key), toBytes(field), increment) + } + + + /** + * Set the string value of a hash field + * + * @param key + * @param field + * @param value + */ + case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { + def this(key: String, field: String, value: String) = + this(toBytes(key), toBytes(field), toBytes(value)) + } + + /** + * Set the value of a hash field, only if the field does not exist + * + * @param key + * @param field + * @param value + */ + case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { + def this(key: String, field: String, value: String) = + this(toBytes(key), toBytes(field), toBytes(value)) + } + + } + + object HyperLogLog { + + /** + * Adds the specified elements to the specified HyperLogLog + * + * @param key + * @param element + */ + case class PFADD(key: String, element: String) + + } + + object Lists { + + + /** + * Prepend one or multiple values to a list + * + * @param key + * @param value + */ + case class LPUSH(key: Array[Byte], value: Array[Byte]) { + + def this(key: String, value: String) = this(key, toBytes(value)) + } + + /** + * Prepend a value to a list, only if the list exists + * + * @param key + * @param value + */ + case class LPUSHX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Set the value of an element in a list by its index + * + * @param key + * @param index + * @param value + */ + case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) { + def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value)) + } + + /** + * Append one or multiple values to a list + * + * @param key + * @param value + */ + case class RPUSH(key: Array[Byte], value: Array[Byte]) { + + def this(key: String, value: String) = this(key, toBytes(value)) + } + + /** + * Append a value to a list, only if the list exists + * + * @param key + * @param value + */ + case class RPUSHX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + } + + object Keys { + + /** + * Delete a key + * + * @param message + */ + case class DEL(message: Array[Byte]) { + + def this(message: String) = this(toBytes(message)) + } + + /** + * Set a key's time to live in seconds + * + * @param key + */ + case class EXPIRE(key: Array[Byte], seconds: Int) { + def this(key: String, seconds: Int) = this(toBytes(key), seconds) + } + + /** + * Set the expiration for a key as a UNIX timestamp + * + * @param key + * @param timestamp + */ + case class EXPIREAT(key: Array[Byte], timestamp: Long) { + def this(key: String, timestamp: Long) = this(toBytes(key), timestamp) + } + + /** + * Atomically transfer a key from a Redis instance to another one. + * + * @param host + * @param port + * @param key + * @param database + * @param timeout + */ + case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) { + def this(host: String, port: Int, key: String, database: Int, timeout: Int) = + this(toBytes(host), port, toBytes(key), database, timeout) + } + + /** + * Move a key to another database + * + * @param key + * @param db + */ + case class MOVE(key: Array[Byte], db: Int) { + def this(key: String, db: Int) = this(toBytes(key), db) + } + + /** + * Remove the expiration from a key + * + * @param key + */ + case class PERSIST(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Set a key's time to live in milliseconds + * + * @param key + * @param milliseconds + */ + case class PEXPIRE(key: Array[Byte], milliseconds: Long) { + def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + } + + /** + * Set the expiration for a key as a UNIX timestamp specified in milliseconds + * + * @param key + * @param timestamp + */ + case class PEXPIREAT(key: Array[Byte], timestamp: Long) { + def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + } + + /** + * Rename a key + * + * @param key + * @param newKey + */ + case class RENAME(key: Array[Byte], newKey: Array[Byte]) { + def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + } + + /** + * Rename a key, only if the new key does not exist + * + * @param key + * @param newKey + */ + case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) { + def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + } + + } + + + object Sets { + + /** + * Add one or more members to a set + * + * @param key + * @param members + */ + case class SADD(key: Array[Byte], members: Array[Byte]) { + + def this(key: String, members: String) = this(key, toBytes(members)) + } + + + /** + * Move a member from one set to another + * + * @param source + * @param destination + * @param member + */ + case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) { + def this(source: String, destination: String, member: String) = + this(toBytes(source), toBytes(destination), toBytes(member)) + } + + + /** + * Remove one or more members from a set + * + * @param key + * @param member + */ + case class SREM(key: Array[Byte], member: Array[Byte]) { + + def this(key: String, member: String) = this(key, toBytes(member)) + } + + } + + object String { + + /** + * Append a value to a key + * + * @param key + * @param value + */ + case class APPEND(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Decrement the integer value of a key by one + * + * @param key + */ + case class DECR(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Decrement the integer value of a key by the given number + * + * @param key + * @param decrement + */ + case class DECRBY(key: Array[Byte], decrement: Int) { + def this(key: String, decrement: Int) = this(toBytes(key), decrement) + } + + /** + * Increment the integer value of a key by one + * + * @param key + */ + case class INCR(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Increment the integer value of a key by the given amount + * + * @param key + * @param increment + */ + case class INCRBY(key: Array[Byte], increment: Int) { + def this(key: String, increment: Int) = this(toBytes(key), increment) + } + + /** + * Increment the float value of a key by the given amount + * + * @param key + * @param increment + */ + case class INCRBYFLOAT(key: Array[Byte], increment: Double) { + def this(key: String, increment: Number) = this(toBytes(key), increment) + } + + + /** + * Set the string value of a key + * + * @param key + * @param value + */ + case class SET(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Sets or clears the bit at offset in the string value stored at key + * + * @param key + * @param offset + * @param value + */ + case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) { + def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value)) + } + + /** + * Set the value and expiration of a key + * + * @param key + * @param seconds + * @param value + */ + case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) { + def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value)) + } + + /** + * Set the value of a key, only if the key does not exist + * + * @param key + * @param value + */ + case class SETNX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Overwrite part of a string at key starting at the specified offset + * + * @param key + * @param offset + * @param value + */ + case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) { + def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value)) + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala new file mode 100644 index 0000000..3f75949 --- /dev/null +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.redis + +import org.apache.gearpump.Message +import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD +import org.apache.gearpump.redis.RedisMessage.Hashes._ +import org.apache.gearpump.redis.RedisMessage.HyperLogLog._ +import org.apache.gearpump.redis.RedisMessage.Keys._ +import org.apache.gearpump.redis.RedisMessage.Lists._ +import org.apache.gearpump.redis.RedisMessage.Sets._ +import org.apache.gearpump.redis.RedisMessage.String._ +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import redis.clients.jedis.Jedis +import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT} + +/** + * Save message in Redis Instance + * + * @param host + * @param port + * @param timeout + * @param database + * @param password + */ +class RedisSink( + host: String = DEFAULT_HOST, + port: Int = DEFAULT_PORT, + timeout: Int = DEFAULT_TIMEOUT, + database: Int = DEFAULT_DATABASE, + password: String = "") extends DataSink { + + private val LOG = LogUtil.getLogger(getClass) + @transient private lazy val client = new Jedis(host, port, timeout) + + override def open(context: TaskContext): Unit = { + client.select(database) + + if (password != null && password.length != 0) { + client.auth(password) + } + } + + override def write(message: Message): Unit = { + + message.msg match { + // GEO + case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) + + // Hashes + case msg: HDEL => client.hdel(msg.key, msg.field) + case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment) + case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment) + case msg: HSET => client.hset(msg.key, msg.field, msg.value) + case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value) + + // HyperLogLog + case msg: PFADD => client.pfadd(msg.key, msg.element) + + // Lists + case msg: LPUSH => client.lpush(msg.key, msg.value) + case msg: LPUSHX => client.lpushx(msg.key, msg.value) + case msg: LSET => client.lset(msg.key, msg.index, msg.value) + case msg: RPUSH => client.rpush(msg.key, msg.value) + case msg: RPUSHX => client.rpushx(msg.key, msg.value) + + // Keys + case msg: DEL => client.del(msg.message) + case msg: EXPIRE => client.expire(msg.key, msg.seconds) + case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp) + case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout) + case msg: MOVE => client.move(msg.key, msg.db) + case msg: PERSIST => client.persist(msg.key) + case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds) + case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp) + case msg: RENAME => client.rename(msg.key, msg.newKey) + case msg: RENAMENX => client.renamenx(msg.key, msg.newKey) + + // Sets + case msg: SADD => client.sadd(msg.key, msg.members) + case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member) + case msg: SREM => client.srem(msg.key, msg.member) + + // String + case msg: APPEND => client.append(msg.key, msg.value) + case msg: DECR => client.decr(msg.key) + case msg: DECRBY => client.decrBy(msg.key, msg.decrement) + case msg: INCR => client.incr(msg.key) + case msg: INCRBY => client.incrBy(msg.key, msg.increment) + case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment) + case msg: SET => client.set(msg.key, msg.value) + case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value) + case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value) + case msg: SETNX => client.setnx(msg.key, msg.value) + case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value) + } + } + + override def close(): Unit = { + client.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index fe8ec61..34f0ae2 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -415,6 +415,18 @@ object Build extends sbt.Build { )) .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + lazy val redis = Project( + id = "gearpump-experiments-redis", + base = file("experiments/redis"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "redis.clients" % "jedis" % "2.9.0" + ), + mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test") + )) + .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + lazy val storm = Project( id = "gearpump-experiments-storm", base = file("experiments/storm"),
