Merge branch 'master' into akka-streams Author: manuzhang <[email protected]> Author: darionyaphet <[email protected]>
Closes #95 from manuzhang/akka-streams. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/bc394035 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/bc394035 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/bc394035 Branch: refs/heads/akka-streams Commit: bc39403525b4065360acf82386fac21a588b59e6 Parents: 4fe5458 Author: manuzhang <[email protected]> Authored: Tue Oct 11 11:57:48 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Oct 11 11:57:48 2016 +0800 ---------------------------------------------------------------------- .../wordcount/dsl/WindowedWordCount.scala | 87 ++++ .../apache/gearpump/redis/RedisMessage.scala | 456 +++++++++++++++++++ .../org/apache/gearpump/redis/RedisSink.scala | 119 +++++ project/Build.scala | 62 ++- project/BuildShaded.scala | 127 +++--- .../apache/gearpump/streaming/Constants.scala | 1 + .../gearpump/streaming/StreamApplication.scala | 2 +- .../apache/gearpump/streaming/dsl/Stream.scala | 106 +++-- .../gearpump/streaming/dsl/StreamApp.scala | 34 +- .../streaming/dsl/javaapi/JavaStream.scala | 22 +- .../apache/gearpump/streaming/dsl/op/OP.scala | 109 ----- .../dsl/partitioner/GroupByPartitioner.scala | 49 ++ .../dsl/partitioner/GroupbyPartitioner.scala | 46 -- .../apache/gearpump/streaming/dsl/plan/OP.scala | 214 +++++++++ .../streaming/dsl/plan/OpTranslator.scala | 222 --------- .../gearpump/streaming/dsl/plan/Planner.scala | 65 ++- .../plan/functions/SingleInputFunction.scala | 107 +++++ .../streaming/dsl/task/CountTriggerTask.scala | 63 +++ .../dsl/task/EventTimeTriggerTask.scala | 59 +++ .../dsl/task/ProcessingTimeTriggerTask.scala | 82 ++++ .../streaming/dsl/task/TransformTask.scala | 47 ++ .../dsl/window/api/AccumulationMode.scala | 24 + .../streaming/dsl/window/api/GroupByFn.scala | 47 ++ .../streaming/dsl/window/api/Trigger.scala | 27 ++ .../streaming/dsl/window/api/Window.scala | 77 ++++ .../streaming/dsl/window/api/WindowFn.scala | 63 +++ .../dsl/window/impl/ReduceFnRunner.scala | 29 ++ .../streaming/dsl/window/impl/Window.scala | 75 +++ .../dsl/window/impl/WindowRunner.scala | 114 +++++ .../streaming/source/DataSourceTask.scala | 15 +- .../gearpump/streaming/task/TaskActor.scala | 4 +- .../gearpump/streaming/dsl/StreamAppSpec.scala | 67 +-- .../gearpump/streaming/dsl/StreamSpec.scala | 24 +- .../partitioner/GroupByPartitionerSpec.scala | 23 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 244 ++++++++++ .../streaming/dsl/plan/OpTranslatorSpec.scala | 148 ------ .../streaming/dsl/plan/PlannerSpec.scala | 132 ++++++ .../functions/SingleInputFunctionSpec.scala | 333 ++++++++++++++ .../dsl/task/CountTriggerTaskSpec.scala | 61 +++ .../dsl/task/EventTimeTriggerTaskSpec.scala | 66 +++ .../task/ProcessingTimeTriggerTaskSpec.scala | 69 +++ 41 files changed, 2937 insertions(+), 784 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala new file mode 100644 index 0000000..4f43fd4 --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.wordcount.dsl + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp} +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.AkkaApp + +object WindowedWordCount extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + val app = StreamApp("dsl", context) + app.source[String](new TimedDataSource). + // word => (word, count) + flatMap(line => line.split("[\\s]+")).map((_, 1)). + // fix window + window(FixedWindow.apply(Duration.ofMillis(5L)) + .triggering(EventTimeTrigger)). + // (word, count1), (word, count2) => (word, count1 + count2) + groupBy(_._1). + sum.sink(new LoggerSink) + + context.submit(app) + context.close() + } + + private class TimedDataSource extends DataSource { + + private var data = List( + Message("foo", 1L), + Message("bar", 2L), + Message("foo", 3L), + Message("foo", 5L), + Message("bar", 7L), + Message("bar", 8L) + ) + + private var watermark: Instant = Instant.ofEpochMilli(0) + + override def read(): Message = { + if (data.nonEmpty) { + val msg = data.head + data = data.tail + watermark = Instant.ofEpochMilli(msg.timestamp) + msg + } else { + null + } + } + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def close(): Unit = {} + + override def getWatermark: Instant = { + if (data.isEmpty) { + watermark = watermark.plusMillis(1) + } + watermark + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala new file mode 100644 index 0000000..84dec70 --- /dev/null +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.redis + +import java.nio.charset.Charset + +object RedisMessage { + + private def toBytes(strings: List[String]): List[Array[Byte]] = + strings.map(string => string.getBytes(Charset.forName("UTF8"))) + + private def toBytes(string: String): Array[Byte] = + string.getBytes(Charset.forName("UTF8")) + + object Connection { + + /** + * Change the selected database for the current connection + * + * @param index + */ + case class SELECT(index: Int) + + } + + object Geo { + + /** + * Add one geospatial item in the geospatial index represented using a sorted set + * + * @param key + * @param longitude + * @param latitude + * @param member + */ + case class GEOADD(key: Array[Byte], longitude: Double, + latitude: Double, member: Array[Byte]) { + def this(key: String, longitude: Double, + latitude: Double, member: String) = + this(toBytes(key), longitude, latitude, toBytes(member)) + } + + } + + object Hashes { + + /** + * Delete a hash field + * + * @param key + * @param field + */ + case class HDEL(key: Array[Byte], field: Array[Byte]) { + def this(key: String, field: String) = this(toBytes(key), toBytes(field)) + } + + /** + * Increment the integer value of a hash field by the given number + * + * @param key + * @param field + * @param increment + */ + case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) { + def this(key: String, field: String, increment: Long) = + this(toBytes(key), toBytes(field), increment) + } + + /** + * Increment the float value of a hash field by the given amount + * + * @param key + * @param field + * @param increment + */ + case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) { + def this(key: String, field: String, increment: Float) = + this(toBytes(key), toBytes(field), increment) + } + + + /** + * Set the string value of a hash field + * + * @param key + * @param field + * @param value + */ + case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { + def this(key: String, field: String, value: String) = + this(toBytes(key), toBytes(field), toBytes(value)) + } + + /** + * Set the value of a hash field, only if the field does not exist + * + * @param key + * @param field + * @param value + */ + case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { + def this(key: String, field: String, value: String) = + this(toBytes(key), toBytes(field), toBytes(value)) + } + + } + + object HyperLogLog { + + /** + * Adds the specified elements to the specified HyperLogLog + * + * @param key + * @param element + */ + case class PFADD(key: String, element: String) + + } + + object Lists { + + + /** + * Prepend one or multiple values to a list + * + * @param key + * @param value + */ + case class LPUSH(key: Array[Byte], value: Array[Byte]) { + + def this(key: String, value: String) = this(key, toBytes(value)) + } + + /** + * Prepend a value to a list, only if the list exists + * + * @param key + * @param value + */ + case class LPUSHX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Set the value of an element in a list by its index + * + * @param key + * @param index + * @param value + */ + case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) { + def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value)) + } + + /** + * Append one or multiple values to a list + * + * @param key + * @param value + */ + case class RPUSH(key: Array[Byte], value: Array[Byte]) { + + def this(key: String, value: String) = this(key, toBytes(value)) + } + + /** + * Append a value to a list, only if the list exists + * + * @param key + * @param value + */ + case class RPUSHX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + } + + object Keys { + + /** + * Delete a key + * + * @param message + */ + case class DEL(message: Array[Byte]) { + + def this(message: String) = this(toBytes(message)) + } + + /** + * Set a key's time to live in seconds + * + * @param key + */ + case class EXPIRE(key: Array[Byte], seconds: Int) { + def this(key: String, seconds: Int) = this(toBytes(key), seconds) + } + + /** + * Set the expiration for a key as a UNIX timestamp + * + * @param key + * @param timestamp + */ + case class EXPIREAT(key: Array[Byte], timestamp: Long) { + def this(key: String, timestamp: Long) = this(toBytes(key), timestamp) + } + + /** + * Atomically transfer a key from a Redis instance to another one. + * + * @param host + * @param port + * @param key + * @param database + * @param timeout + */ + case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) { + def this(host: String, port: Int, key: String, database: Int, timeout: Int) = + this(toBytes(host), port, toBytes(key), database, timeout) + } + + /** + * Move a key to another database + * + * @param key + * @param db + */ + case class MOVE(key: Array[Byte], db: Int) { + def this(key: String, db: Int) = this(toBytes(key), db) + } + + /** + * Remove the expiration from a key + * + * @param key + */ + case class PERSIST(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Set a key's time to live in milliseconds + * + * @param key + * @param milliseconds + */ + case class PEXPIRE(key: Array[Byte], milliseconds: Long) { + def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + } + + /** + * Set the expiration for a key as a UNIX timestamp specified in milliseconds + * + * @param key + * @param timestamp + */ + case class PEXPIREAT(key: Array[Byte], timestamp: Long) { + def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + } + + /** + * Rename a key + * + * @param key + * @param newKey + */ + case class RENAME(key: Array[Byte], newKey: Array[Byte]) { + def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + } + + /** + * Rename a key, only if the new key does not exist + * + * @param key + * @param newKey + */ + case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) { + def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + } + + } + + + object Sets { + + /** + * Add one or more members to a set + * + * @param key + * @param members + */ + case class SADD(key: Array[Byte], members: Array[Byte]) { + + def this(key: String, members: String) = this(key, toBytes(members)) + } + + + /** + * Move a member from one set to another + * + * @param source + * @param destination + * @param member + */ + case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) { + def this(source: String, destination: String, member: String) = + this(toBytes(source), toBytes(destination), toBytes(member)) + } + + + /** + * Remove one or more members from a set + * + * @param key + * @param member + */ + case class SREM(key: Array[Byte], member: Array[Byte]) { + + def this(key: String, member: String) = this(key, toBytes(member)) + } + + } + + object String { + + /** + * Append a value to a key + * + * @param key + * @param value + */ + case class APPEND(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Decrement the integer value of a key by one + * + * @param key + */ + case class DECR(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Decrement the integer value of a key by the given number + * + * @param key + * @param decrement + */ + case class DECRBY(key: Array[Byte], decrement: Int) { + def this(key: String, decrement: Int) = this(toBytes(key), decrement) + } + + /** + * Increment the integer value of a key by one + * + * @param key + */ + case class INCR(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Increment the integer value of a key by the given amount + * + * @param key + * @param increment + */ + case class INCRBY(key: Array[Byte], increment: Int) { + def this(key: String, increment: Int) = this(toBytes(key), increment) + } + + /** + * Increment the float value of a key by the given amount + * + * @param key + * @param increment + */ + case class INCRBYFLOAT(key: Array[Byte], increment: Double) { + def this(key: String, increment: Number) = this(toBytes(key), increment) + } + + + /** + * Set the string value of a key + * + * @param key + * @param value + */ + case class SET(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Sets or clears the bit at offset in the string value stored at key + * + * @param key + * @param offset + * @param value + */ + case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) { + def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value)) + } + + /** + * Set the value and expiration of a key + * + * @param key + * @param seconds + * @param value + */ + case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) { + def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value)) + } + + /** + * Set the value of a key, only if the key does not exist + * + * @param key + * @param value + */ + case class SETNX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Overwrite part of a string at key starting at the specified offset + * + * @param key + * @param offset + * @param value + */ + case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) { + def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value)) + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala new file mode 100644 index 0000000..3f75949 --- /dev/null +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.redis + +import org.apache.gearpump.Message +import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD +import org.apache.gearpump.redis.RedisMessage.Hashes._ +import org.apache.gearpump.redis.RedisMessage.HyperLogLog._ +import org.apache.gearpump.redis.RedisMessage.Keys._ +import org.apache.gearpump.redis.RedisMessage.Lists._ +import org.apache.gearpump.redis.RedisMessage.Sets._ +import org.apache.gearpump.redis.RedisMessage.String._ +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import redis.clients.jedis.Jedis +import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT} + +/** + * Save message in Redis Instance + * + * @param host + * @param port + * @param timeout + * @param database + * @param password + */ +class RedisSink( + host: String = DEFAULT_HOST, + port: Int = DEFAULT_PORT, + timeout: Int = DEFAULT_TIMEOUT, + database: Int = DEFAULT_DATABASE, + password: String = "") extends DataSink { + + private val LOG = LogUtil.getLogger(getClass) + @transient private lazy val client = new Jedis(host, port, timeout) + + override def open(context: TaskContext): Unit = { + client.select(database) + + if (password != null && password.length != 0) { + client.auth(password) + } + } + + override def write(message: Message): Unit = { + + message.msg match { + // GEO + case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) + + // Hashes + case msg: HDEL => client.hdel(msg.key, msg.field) + case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment) + case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment) + case msg: HSET => client.hset(msg.key, msg.field, msg.value) + case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value) + + // HyperLogLog + case msg: PFADD => client.pfadd(msg.key, msg.element) + + // Lists + case msg: LPUSH => client.lpush(msg.key, msg.value) + case msg: LPUSHX => client.lpushx(msg.key, msg.value) + case msg: LSET => client.lset(msg.key, msg.index, msg.value) + case msg: RPUSH => client.rpush(msg.key, msg.value) + case msg: RPUSHX => client.rpushx(msg.key, msg.value) + + // Keys + case msg: DEL => client.del(msg.message) + case msg: EXPIRE => client.expire(msg.key, msg.seconds) + case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp) + case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout) + case msg: MOVE => client.move(msg.key, msg.db) + case msg: PERSIST => client.persist(msg.key) + case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds) + case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp) + case msg: RENAME => client.rename(msg.key, msg.newKey) + case msg: RENAMENX => client.renamenx(msg.key, msg.newKey) + + // Sets + case msg: SADD => client.sadd(msg.key, msg.members) + case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member) + case msg: SREM => client.srem(msg.key, msg.member) + + // String + case msg: APPEND => client.append(msg.key, msg.value) + case msg: DECR => client.decr(msg.key) + case msg: DECRBY => client.decrBy(msg.key, msg.decrement) + case msg: INCR => client.incr(msg.key) + case msg: INCRBY => client.incrBy(msg.key, msg.increment) + case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment) + case msg: SET => client.set(msg.key, msg.value) + case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value) + case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value) + case msg: SETNX => client.setnx(msg.key, msg.value) + case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value) + } + } + + override def close(): Unit = { + client.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 0b1628e..f1e0443 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -154,12 +154,6 @@ object Build extends sbt.Build { dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion ) - val streamingDependencies = Seq( - unmanagedJars in Compile ++= Seq( - getShadedJarFile("gs-collections", version.value) - ) - ) - val coreDependencies = Seq( libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % slf4jVersion, @@ -199,9 +193,9 @@ object Build extends sbt.Build { ), unmanagedJars in Compile ++= Seq( - getShadedJarFile("metrics-graphite", version.value), - getShadedJarFile("guava", version.value), - getShadedJarFile("akka-kryo", version.value) + getShadedJarFile(shaded_metrics_graphite.id, version.value), + getShadedJarFile(shaded_guava.id, version.value), + getShadedJarFile(shaded_akka_kryo.id, version.value) ) ) @@ -250,6 +244,20 @@ object Build extends sbt.Build { .map(_.filterNot(_.getCanonicalPath.contains("akka"))) } + private def addShadedDeps(deps: Seq[xml.Node], node: xml.Node): xml.Node = { + node match { + case elem: xml.Elem => + val child = if (elem.label == "dependencies") { + elem.child ++ deps + } else { + elem.child.map(addShadedDeps(deps, _)) + } + xml.Elem(elem.prefix, elem.label, elem.attributes, elem.scope, false, child: _*) + case _ => + node + } + } + lazy val root = Project( id = "gearpump", base = file("."), @@ -262,7 +270,14 @@ object Build extends sbt.Build { lazy val core = Project( id = "gearpump-core", base = file("core"), - settings = commonSettings ++ javadocSettings ++ coreDependencies) + settings = commonSettings ++ javadocSettings ++ coreDependencies ++ Seq( + pomPostProcess := { + (node: xml.Node) => addShadedDeps(List( + getShadedDepXML(organization.value, shaded_akka_kryo.id, version.value), + getShadedDepXML(organization.value, shaded_guava.id, version.value), + getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node) + } + )) .disablePlugins(sbtassembly.AssemblyPlugin) lazy val daemon = Project( @@ -282,9 +297,18 @@ object Build extends sbt.Build { lazy val streaming = Project( id = "gearpump-streaming", base = file("streaming"), - settings = commonSettings ++ javadocSettings ++ streamingDependencies) - .dependsOn(core % "test->test; compile->compile", daemon % "test->test") - .disablePlugins(sbtassembly.AssemblyPlugin) + settings = commonSettings ++ javadocSettings ++ Seq( + unmanagedJars in Compile ++= Seq( + getShadedJarFile(shaded_gs_collections.id, version.value) + ), + + pomPostProcess := { + (node: xml.Node) => addShadedDeps(List( + getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node) + } + )) + .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test") + .disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_kafka = Project( id = "gearpump-external-kafka", @@ -402,6 +426,18 @@ object Build extends sbt.Build { .dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile") .disablePlugins(sbtassembly.AssemblyPlugin) + lazy val redis = Project( + id = "gearpump-experiments-redis", + base = file("experiments/redis"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "redis.clients" % "jedis" % "2.9.0" + ), + mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test") + )) + .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + lazy val storm = Project( id = "gearpump-experiments-storm", base = file("experiments/storm"), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/BuildShaded.scala ---------------------------------------------------------------------- diff --git a/project/BuildShaded.scala b/project/BuildShaded.scala index 1f59bfd..a43587c 100644 --- a/project/BuildShaded.scala +++ b/project/BuildShaded.scala @@ -35,7 +35,7 @@ object BuildShaded extends sbt.Build { _.copy(includeScala = false) }, assemblyJarName in assembly := { - s"${name.value}-$scalaVersionMajor-${version.value}-assembly.jar" + s"${name.value}_$scalaVersionMajor-${version.value}.jar" }, target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor ) @@ -44,92 +44,99 @@ object BuildShaded extends sbt.Build { id = "gearpump-shaded", base = file("shaded") ).aggregate(shaded_akka_kryo, shaded_gs_collections, shaded_guava, shaded_metrics_graphite) - .disablePlugins(sbtassembly.AssemblyPlugin) - + .disablePlugins(sbtassembly.AssemblyPlugin) lazy val shaded_akka_kryo = Project( id = "gearpump-shaded-akka-kryo", base = file("shaded/akka-kryo"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.zap("com.google.protobuf.**").inAll, - ShadeRule.zap("com.typesafe.config.**").inAll, - ShadeRule.zap("akka.**").inAll, - ShadeRule.zap("org.jboss.netty.**").inAll, - ShadeRule.zap("net.jpountz.lz4.**").inAll, - ShadeRule.zap("org.uncommons.maths.**").inAll, - ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll, - ShadeRule.rename("com.esotericsoftware.**" -> - "org.apache.gearpump.esotericsoftware.@1").inAll, - ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.zap("com.google.protobuf.**").inAll, + ShadeRule.zap("com.typesafe.config.**").inAll, + ShadeRule.zap("akka.**").inAll, + ShadeRule.zap("org.jboss.netty.**").inAll, + ShadeRule.zap("net.jpountz.lz4.**").inAll, + ShadeRule.zap("org.uncommons.maths.**").inAll, + ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll, + ShadeRule.rename("com.esotericsoftware.**" -> + "org.apache.gearpump.esotericsoftware.@1").inAll, + ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll + ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion ) + ) ) lazy val shaded_gs_collections = Project( id = "gearpump-shaded-gs-collections", base = file("shaded/gs-collections"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.gs.collections.**" -> - "org.apache.gearpump.gs.collections.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.goldmansachs" % "gs-collections" % gsCollectionsVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.gs.collections.**" -> + "org.apache.gearpump.gs.collections.@1").inAll ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.goldmansachs" % "gs-collections" % gsCollectionsVersion + ) + ) ) lazy val shaded_guava = Project( id = "gearpump-shaded-guava", base = file("shaded/guava"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.google.guava" % "guava" % guavaVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll + ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.google.guava" % "guava" % guavaVersion ) + ) ) lazy val shaded_metrics_graphite = Project( id = "gearpump-shaded-metrics-graphite", base = file("shaded/metrics-graphite"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.codahale.metrics.**" -> - "org.apache.gearpump.codahale.metrics.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.codahale.metrics" % "metrics-graphite" % codahaleVersion, - "com.codahale.metrics" % "metrics-jvm" % codahaleVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.codahale.metrics.**" -> + "org.apache.gearpump.codahale.metrics.@1").inAll ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.codahale.metrics" % "metrics-graphite" % codahaleVersion, + "com.codahale.metrics" % "metrics-jvm" % codahaleVersion + ) + ) ) def getShadedJarFile(name: String, gearpumpVersion: String): File = { shaded.base / "target" / scalaVersionMajor / - s"gearpump-shaded-$name-$scalaVersionMajor-$gearpumpVersion-assembly.jar" + s"${name}_$scalaVersionMajor-$gearpumpVersion.jar" + } + + def getShadedDepXML(groupId: String, artifactId: String, version: String): scala.xml.Node = { + <dependency> + <groupId>{groupId}</groupId> + <artifactId>{artifactId}</artifactId> + <version>{version}</version> + </dependency> } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index cd33b50..f99a436 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -22,6 +22,7 @@ object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source" val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" + val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function" val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 66ec873..a6588a1 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -123,7 +123,7 @@ object LifeTime { */ class StreamApplication( override val name: String, val inputUserConfig: UserConfig, - val dag: Graph[ProcessorDescription, PartitionerDescription]) + dag: Graph[ProcessorDescription, PartitionerDescription]) extends Application { require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala index 786d496..440a45e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala @@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.window.impl._ import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -35,12 +38,12 @@ class Stream[T]( /** * converts a value[T] to a list of value[R] * - * @param fun FlatMap function + * @param fn FlatMap function * @param description The description message for this operation * @return A new stream with type [R] */ - def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = { - val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap")) + def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { + val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description)) graph.addVertex(flatMapOp) graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) new Stream[R](graph, flatMapOp) @@ -49,36 +52,36 @@ class Stream[T]( /** * Maps message of type T message of type R * - * @param fun Function + * @param fn Function * @return A new stream with type [R] */ - def map[R](fun: T => R, description: String = null): Stream[R] = { + def map[R](fn: T => R, description: String = "map"): Stream[R] = { this.flatMap({ data => - Option(fun(data)) - }, Option(description).getOrElse("map")) + Option(fn(data)) + }, description) } /** * Keeps records when fun(T) == true * - * @param fun the filter + * @param fn the filter * @return a new stream after filter */ - def filter(fun: T => Boolean, description: String = null): Stream[T] = { + def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { this.flatMap({ data => - if (fun(data)) Option(data) else None - }, Option(description).getOrElse("filter")) + if (fn(data)) Option(data) else None + }, description) } /** * Reduces operations. * - * @param fun reduction function + * @param fn reduction function * @param description description message for this operator * @return a new stream after reduction */ - def reduce(fun: (T, T) => T, description: String = null): Stream[T] = { - val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce")) + def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { + val reduceOp = ChainableOp(new ReduceFunction(fn, description)) graph.addVertex(reduceOp) graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) new Stream(graph, reduceOp) @@ -88,7 +91,10 @@ class Stream[T]( * Log to task log file */ def log(): Unit = { - this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log") + this.map(msg => { + LoggerFactory.getLogger("dsl").info(msg.toString) + msg + }, "log") } /** @@ -97,8 +103,8 @@ class Stream[T]( * @param other the other stream * @return the merged stream */ - def merge(other: Stream[T], description: String = null): Stream[T] = { - val mergeOp = MergeOp(Option(description).getOrElse("merge")) + def merge(other: Stream[T], description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(description, UserConfig.empty) graph.addVertex(mergeOp) graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) @@ -115,20 +121,29 @@ class Stream[T]( * * For example, * {{{ - * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) + * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) * }}} * - * @param fun Group by function + * @param fn Group by function * @param parallelism Parallelism level * @param description The description * @return the grouped stream */ - def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null) - : Stream[T] = { - val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy")) - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + window(CountWindow.apply(1).accumulating) + .groupBy[GROUP](fn, parallelism, description) + } + + /** + * Window function + * + * @param win window definition + * @param description window description + * @return [[WindowStream]] where groupBy could be applied + */ + def window(win: Window, description: String = "window"): WindowStream[T] = { + new WindowStream[T](graph, edge, thisNode, win, description) } /** @@ -140,15 +155,28 @@ class Stream[T]( */ def process[R]( processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[R] = { - val processorOp = ProcessorOp(processor, parallelism, conf, - Option(description).getOrElse("process")) + description: String = "process"): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, description) graph.addVertex(processorOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) new Stream[R](graph, processorOp, Some(Shuffle)) } } +class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, + window: Window, winDesc: String) { + + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, + s"$winDesc.$description") + graph.addVertex(groupOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) + new Stream[T](graph, groupOp) + } +} + class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { /** * GroupBy key @@ -192,30 +220,18 @@ object Stream { } implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String) - : Stream[T] = { - implicit val sink = DataSinkOp[T](dataSink, parallism, conf, - Some(description).getOrElse("traversable")) + def sink(dataSink: DataSink, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { + implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) stream.graph.addVertex(sink) stream.graph.addEdge(stream.thisNode, Shuffle, sink) new Stream[T](stream.graph, sink) } - - def sink[T]( - sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[T] = { - val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source")) - stream.graph.addVertex(sinkOp) - stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp) - new Stream[T](stream.graph, sinkOp) - } } } class LoggerSink[T] extends DataSink { - var logger: Logger = null - - private var context: TaskContext = null + var logger: Logger = _ override def open(context: TaskContext): Unit = { this.logger = context.logger http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala index d45737b..8116146 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala @@ -24,10 +24,9 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp} -import org.apache.gearpump.streaming.dsl.plan.Planner +import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.Graph import org.apache.gearpump.Message @@ -50,7 +49,8 @@ import scala.language.implicitConversions * @param name name of app */ class StreamApp( - val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { + name: String, system: ActorSystem, userConfig: UserConfig, + private val graph: Graph[Op, OpEdge]) { def this(name: String, system: ActorSystem, userConfig: UserConfig) = { this(name, system, userConfig, Graph.empty[Op, OpEdge]) @@ -76,34 +76,16 @@ object StreamApp { implicit class Source(app: StreamApp) extends java.io.Serializable { - def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = { - source(dataSource, parallelism, UserConfig.empty) - } - - def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = { - source(dataSource, parallelism, UserConfig.empty, description) - } - - def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = { - source(dataSource, parallelism, conf, description = null) - } - - def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - : Stream[T] = { + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) } + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) } - - def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) - : Stream[T] = { - val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source")) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } } } @@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { override def read(): Message = { if (iterator.hasNext) { - Message(iterator.next()) + Message(iterator.next(), Instant.now().toEpochMilli) } else { null } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 6eff20c..3003b98 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.dsl.javaapi import scala.collection.JavaConverters._ - import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.Stream +import org.apache.gearpump.streaming.dsl.window.api.Window +import org.apache.gearpump.streaming.dsl.{Stream, WindowStream} import org.apache.gearpump.streaming.javaapi.dsl.functions._ import org.apache.gearpump.streaming.task.Task @@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) { * Group by a stream and turns it to a list of sub-streams. Operations chained after * groupBy applies to sub-streams. */ - def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String) - : JavaStream[T] = { - new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], + parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + } + + def window(win: Window, description: String): JavaWindowStream[T] = { + new JavaWindowStream[T](stream.window(win, description)) } /** Add a low level Processor to process messages */ @@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) { new JavaStream[R](stream.process(processor, parallelism, conf, description)) } } + +class JavaWindowStream[T](stream: WindowStream[T]) { + + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, + description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala deleted file mode 100644 index 49d9dec..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.op - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.Task - -/** - * Operators for the DSL - */ -sealed trait Op { - def description: String - def conf: UserConfig -} - -/** - * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP - * "Attach" means running in same Actor. - */ -trait SlaveOp[T] extends Op - -case class FlatMapOp[T, R]( - fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -trait MasterOp extends Op - -trait ParameterizedOp[T] extends MasterOp - -case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) - extends MasterOp - -case class GroupByOp[T, R]( - fun: T => R, parallelism: Int, description: String, - override val conf: UserConfig = UserConfig.empty) - extends ParameterizedOp[T] - -case class ProcessorOp[T <: Task]( - processor: Class[T], parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSourceOp[T]( - dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSinkOp[T]( - dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -/** - * Contains operators which can be chained to single one. - * - * For example, flatmap().map().reduce() can be chained to single operator as - * no data shuffling is required. - * @param ops list of operations - */ -case class OpChain(ops: List[Op]) extends Op { - def head: Op = ops.head - def last: Op = ops.last - - def description: String = null - - override def conf: UserConfig = { - // The head's conf has priority - ops.reverse.foldLeft(UserConfig.empty) { (conf, op) => - conf.withConfig(op.conf) - } - } -} - -trait OpEdge - -/** - * The upstream OP and downstream OP doesn't require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Direct extends OpEdge - -/** - * The upstream OP and downstream OP DOES require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Shuffle extends OpEdge - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala new file mode 100644 index 0000000..2ec881b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.partitioner + +import org.apache.gearpump.Message +import org.apache.gearpump.partitioner.UnicastPartitioner +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn + +/** + * Partition messages by applying group by function first. + * + * For example: + * {{{ + * case class People(name: String, gender: String) + * + * object Test{ + * + * val groupBy: (People => String) = people => people.gender + * val partitioner = GroupByPartitioner(groupBy) + * } + * }}} + * + * @param fn First apply message with groupBy function, then pick the hashCode of the output + * to do the partitioning. You must define hashCode() for output type of groupBy function. + */ +class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group]) + extends UnicastPartitioner { + override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { + val hashCode = fn.groupBy(message).hashCode() + (hashCode & Integer.MAX_VALUE) % partitionNum + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala deleted file mode 100644 index b2e2932..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.partitioner - -import org.apache.gearpump.Message -import org.apache.gearpump.partitioner.UnicastPartitioner - -/** - * Partition messages by applying group by function first. - * - * For example: - * {{{ - * case class People(name: String, gender: String) - * - * object Test{ - * - * val groupBy: (People => String) = people => people.gender - * val partitioner = GroupByPartitioner(groupBy) - * } - * }}} - * - * @param groupBy First apply message with groupBy function, then pick the hashCode of the output - * to do the partitioning. You must define hashCode() for output type of groupBy function. - */ -class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode() - (hashCode & Integer.MAX_VALUE) % partitionNum - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala new file mode 100644 index 0000000..744976b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.{Constants, Processor} +import org.apache.gearpump.streaming.dsl.task.TransformTask +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} +import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} +import org.apache.gearpump.streaming.task.Task + +import scala.reflect.ClassTag + +/** + * This is a vertex on the logical plan. + */ +sealed trait Op { + + def description: String + + def userConfig: UserConfig + + def chain(op: Op)(implicit system: ActorSystem): Op + + def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] +} + +/** + * This represents a low level Processor. + */ +case class ProcessorOp[T <: Task]( + processor: Class[T], + parallelism: Int, + userConfig: UserConfig, + description: String) + extends Op { + + def this( + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "processor")(implicit classTag: ClassTag[T]) = { + this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description) + } + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, other) + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DefaultProcessor(parallelism, description, userConfig, processor) + } +} + +/** + * This represents a DataSource. + */ +case class DataSourceOp( + dataSource: DataSource, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "source") + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + DataSourceOp(dataSource, parallelism, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn), + description) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[DataSourceTask[Any, Any]](parallelism, description, + userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) + } +} + +/** + * This represents a DataSink. + */ +case class DataSinkOp( + dataSink: DataSink, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "sink") + extends Op { + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, op) + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DataSinkProcessor(dataSink, parallelism, description) + } +} + +/** + * This represents operations that can be chained together + * (e.g. flatMap, map, filter, reduce) and further chained + * to another Op to be used + */ +case class ChainableOp[IN, OUT]( + fn: SingleInputFunction[IN, OUT]) extends Op { + + override def description: String = fn.description + + override def userConfig: UserConfig = UserConfig.empty + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[OUT, _] => + // TODO: preserve type info + ChainableOp(fn.andThen(op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor") + } +} + +/** + * This represents a Processor with window aggregation + */ +case class GroupByOp[IN, GROUP]( + groupByFn: GroupByFn[IN, GROUP], + parallelism: Int = 1, + description: String = "groupBy", + override val userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + GroupByOp(groupByFn, parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + groupByFn.getProcessor(parallelism, description, userConfig) + } +} + +/** + * This represents a Processor transforming merged streams + */ +case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[TransformTask[Any, Any]](1, description, userConfig) + } + +} + +/** + * This is an edge on the logical plan. + */ +trait OpEdge + +/** + * The upstream OP and downstream OP doesn't require network data shuffle. + * e.g. ChainableOp + */ +case object Direct extends OpEdge + +/** + * The upstream OP and downstream OP DOES require network data shuffle. + * e.g. GroupByOp + */ +case object Shuffle extends OpEdge + +/** + * Runtime exception thrown on chaining. + */ +class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala deleted file mode 100644 index b09d9b9..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.plan - -import scala.collection.TraversableOnce -import akka.actor.ActorSystem -import org.slf4j.Logger -import org.apache.gearpump._ -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.op._ -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.sink.DataSinkProcessor -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.LogUtil - -/** - * Translates a OP to a TaskDescription - */ -class OpTranslator extends java.io.Serializable { - val LOG: Logger = LogUtil.getLogger(getClass) - - def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = { - - val baseConfig = ops.conf - - ops.ops.head match { - case op: MasterOp => - val tail = ops.ops.tail - val func = toFunction(tail) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - op match { - case DataSourceOp(dataSource, parallelism, conf, description) => - Processor[DataSourceTask[Any, Any]](parallelism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) - case groupby@GroupByOp(_, parallelism, description, _) => - Processor[GroupByTask[Object, Object, Object]](parallelism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) - case merge: MergeOp => - Processor[TransformTask[Object, Object]](1, - description = op.description + "." + func.description, - userConfig) - case ProcessorOp(processor, parallelism, conf, description) => - DefaultProcessor(parallelism, - description = description + " " + func.description, - userConfig, processor) - case DataSinkOp(dataSink, parallelism, conf, description) => - DataSinkProcessor(dataSink, parallelism, description + func.description) - } - case op: SlaveOp[_] => - val func = toFunction(ops.ops) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - Processor[TransformTask[Object, Object]](1, - description = func.description, - taskConf = userConfig) - case chain: OpChain => - throw new RuntimeException("Not supposed to be called!") - } - } - - private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = { - val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]() - val totalFunction = ops.foldLeft(func) { (fun, op) => - - val opFunction = op match { - case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] => - new FlatMapFunction(flatmap.fun, flatmap.description) - case reduce: ReduceOp[Object @unchecked] => - new ReduceFunction(reduce.fun, reduce.description) - case _ => - throw new RuntimeException("Not supposed to be called!") - } - fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]]) - } - totalFunction.asInstanceOf[SingleInputFunction[Object, Object]] - } -} - -object OpTranslator { - - trait SingleInputFunction[IN, OUT] extends Serializable { - def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - new AndThen(this, other) - } - - def description: String - } - - class DummyInputFunction[T] extends SingleInputFunction[T, T] { - override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) - : SingleInputFunction[T, OUTER] = { - other - } - - // Should never be called - override def process(value: T): TraversableOnce[T] = None - - override def description: String = "" - } - - class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - first.process(value).flatMap(second.process) - } - - override def description: String = { - Option(first.description).flatMap { description => - Option(second.description).map(description + "." + _) - }.orNull - } - } - - class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - fun(value) - } - - override def description: String = { - this.descriptionMessage - } - } - - class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) - extends SingleInputFunction[T, T] { - - private var state: Any = _ - - override def process(value: T): TraversableOnce[T] = { - if (state == null) { - state = value - } else { - state = fun(state.asInstanceOf[T], value) - } - Some(state.asInstanceOf[T]) - } - - override def description: String = descriptionMessage - } - - class GroupByTask[IN, GROUP, OUT]( - groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[GroupByOp[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, - taskContext, userConf) - } - - private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]] - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - val group = groupBy(msg.msg.asInstanceOf[IN]) - if (!groups.contains(group)) { - val operator = - userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - groups += group -> operator - } - - val operator = groups(group) - - operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - } - } - - class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[SingleInputFunction[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) - } - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - operator match { - case Some(op) => - op.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - case None => - taskContext.output(new Message(msg.msg, time)) - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index f5bbd65..16d5c06 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -22,7 +22,6 @@ import akka.actor.ActorSystem import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.dsl.op._ import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph @@ -33,64 +32,60 @@ class Planner { * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low * level Graph API. */ - def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem) - : Graph[Processor[_ <: Task], _ <: Partitioner] = { + def plan(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { - val opTranslator = new OpTranslator() - - val newDag = optimize(dag) - newDag.mapEdge { (node1, edge, node2) => + val graph = optimize(dag) + graph.mapEdge { (node1, edge, node2) => edge match { case Shuffle => - node2.head match { - case groupBy: GroupByOp[Any @unchecked, Any @unchecked] => - new GroupByPartitioner(groupBy.fun) + node2 match { + case groupBy: GroupByOp[_, _] => + new GroupByPartitioner(groupBy.groupByFn) case _ => new HashPartitioner } case Direct => new CoLocationPartitioner } - }.mapVertex { opChain => - opTranslator.translate(opChain) - } + }.mapVertex(_.getProcessor) } - private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = { - val newGraph = dag.mapVertex(op => OpChain(List(op))) - - val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse + private def optimize(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Op, OpEdge] = { + val graph = dag.copy + val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse for (node <- nodes) { - val outGoingEdges = newGraph.outgoingEdgesOf(node) + val outGoingEdges = graph.outgoingEdgesOf(node) for (edge <- outGoingEdges) { - merge(newGraph, edge._1, edge._3) + merge(graph, edge._1, edge._3) } } - newGraph + graph } - private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain) - : Graph[OpChain, OpEdge] = { - if (dag.outDegreeOf(node1) == 1 && - dag.inDegreeOf(node2) == 1 && + private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op) + (implicit system: ActorSystem): Unit = { + if (graph.outDegreeOf(node1) == 1 && + graph.inDegreeOf(node2) == 1 && // For processor node, we don't allow it to merge with downstream operators - !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { - val (_, edge, _) = dag.outgoingEdgesOf(node1).head + !node1.isInstanceOf[ProcessorOp[_ <: Task]] && + !node2.isInstanceOf[ProcessorOp[_ <: Task]]) { + val (_, edge, _) = graph.outgoingEdgesOf(node1).head if (edge == Direct) { - val opList = OpChain(node1.ops ++ node2.ops) - dag.addVertex(opList) - for (incomingEdge <- dag.incomingEdgesOf(node1)) { - dag.addEdge(incomingEdge._1, incomingEdge._2, opList) + val chainedOp = node1.chain(node2) + graph.addVertex(chainedOp) + for (incomingEdge <- graph.incomingEdgesOf(node1)) { + graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp) } - for (outgoingEdge <- dag.outgoingEdgesOf(node2)) { - dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3) + for (outgoingEdge <- graph.outgoingEdgesOf(node2)) { + graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3) } // Remove the old vertex - dag.removeVertex(node1) - dag.removeVertex(node2) + graph.removeVertex(node1) + graph.removeVertex(node2) } } - dag } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala new file mode 100644 index 0000000..609fbb0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +trait SingleInputFunction[IN, OUT] extends Serializable { + def process(value: IN): TraversableOnce[OUT] + def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { + new AndThen(this, other) + } + def finish(): TraversableOnce[OUT] = None + def clearState(): Unit = {} + def description: String +} + +class AndThen[IN, MIDDLE, OUT]( + first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + first.process(value).flatMap(second.process) + } + + override def finish(): TraversableOnce[OUT] = { + val firstResult = first.finish().flatMap(second.process) + if (firstResult.isEmpty) { + second.finish() + } else { + firstResult + } + } + + override def clearState(): Unit = { + first.clearState() + second.clearState() + } + + override def description: String = { + Option(first.description).flatMap { description => + Option(second.description).map(description + "." + _) + }.orNull + } +} + +class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + fn(value) + } + + override def description: String = descriptionMessage +} + + +class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) + extends SingleInputFunction[T, T] { + + private var state: Option[T] = None + + override def process(value: T): TraversableOnce[T] = { + if (state.isEmpty) { + state = Option(value) + } else { + state = state.map(fn(_, value)) + } + None + } + + override def finish(): TraversableOnce[T] = { + state + } + + override def clearState(): Unit = { + state = None + } + + override def description: String = descriptionMessage +} + +class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { + + override def process(value: T): TraversableOnce[Unit] = { + emit(value) + None + } + + override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = { + throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction") + } + + override def description: String = "" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala new file mode 100644 index 0000000..4ee2fa8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * This task triggers output on number of messages in a window. + */ +class CountTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size + private var num = 0 + + override def onNext(msg: Message): Unit = { + windowRunner.process(msg) + num += 1 + if (windowSize == num) { + windowRunner.trigger(Instant.ofEpochMilli(windowSize)) + num = 0 + } + } +}
