http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala new file mode 100644 index 0000000..e065c90 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.materializer + +import akka.actor.ActorSystem +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.Timers.{Completion, DelayInitial, Idle, IdleInject, IdleTimeoutBidi, Initial} +import akka.stream.impl.fusing.{Batch, Collect, Delay, Drop, DropWhile, DropWithin, Filter, FlattenMerge, Fold, GraphStageModule, GroupBy, GroupedWithin, Intersperse, LimitWeighted, Log, MapAsync, MapAsyncUnordered, PrefixAndTail, Recover, Reduce, Scan, Split, StatefulMapConcat, SubSink, SubSource, Take, TakeWhile, TakeWithin, Map => FMap} +import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource, TickSource} +import akka.stream.impl.io.IncomingConnectionStage +import akka.stream.impl.{HeadOptionStage, Stages, Throttle, Unfold, UnfoldAsync} +import akka.stream.scaladsl.{Balance, Broadcast, Concat, Interleave, Merge, MergePreferred, MergeSorted, ModuleExtractor, Unzip, Zip, ZipWith2} +import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue +import akka.stream.stage.GraphStage +import org.apache.gearpump.akkastream.GearAttributes +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} +import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTask, ConcatTask, DelayInitialTask, DropWithinTask, FlattenMergeTask, FoldTask, GraphTask, GroupedWithinTask, InterleaveTask, MapAsyncTask, MergeTask, SingleSourceTask, SinkBridgeTask, SourceBridgeTask, StatefulMapConcatTask, TakeWithinTask, ThrottleTask, TickSourceTask, Zip2Task} +import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper +import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle} +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} +import org.apache.gearpump.util.Graph +import org.slf4j.LoggerFactory + +import scala.concurrent.Promise +import scala.concurrent.duration.FiniteDuration + +/** + * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump + * Streaming Application. + * + * @param graph Graph + * @param system ActorSystem + */ +class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { + + import RemoteMaterializerImpl._ + + type ID = String + private implicit val actorSystem = system + + private def uuid: String = { + java.util.UUID.randomUUID.toString + } + + def materialize: (StreamApplication, Map[Module, ProcessorId]) = { + val (opGraph, ids) = toOpGraph + val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph) + val processorIds = resolveIds(app, ids) + + val updatedApp = updateJunctionConfig(processorIds, app) + (removeIds(updatedApp), processorIds) + } + + private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], + app: StreamApplication): StreamApplication = { + val config = junctionConfig(processorIds) + + val dag = app.dag.mapVertex { vertex => + val processorId = vertex.id + val newConf = vertex.taskConf.withConfig(config(processorId)) + vertex.copy(taskConf = newConf) + } + new StreamApplication(app.name, app.inputUserConfig, dag) + } + + private def junctionConfig(processorIds: Map[Module, ProcessorId]): + Map[ProcessorId, UserConfig] = { + val updatedConfigs = graph.vertices.flatMap { vertex => + buildShape(vertex, processorIds) + }.toMap + updatedConfigs + } + + private def buildShape(vertex: Module, processorIds: Map[Module, ProcessorId]): + Option[(ProcessorId, UserConfig)] = { + def inProcessors(vertex: Module): List[ProcessorId] = { + vertex.shape.inlets.flatMap { inlet => + graph.incomingEdgesOf(vertex).find( + _._2.to == inlet).map(_._1 + ).flatMap(processorIds.get) + }.toList + } + def outProcessors(vertex: Module): List[ProcessorId] = { + vertex.shape.outlets.flatMap { outlet => + graph.outgoingEdgesOf(vertex).find( + _._2.from == outlet).map(_._3 + ).flatMap(processorIds.get) + }.toList + } + processorIds.get(vertex).map(processorId => { + (processorId, UserConfig.empty. + withValue(GraphTask.OUT_PROCESSORS, outProcessors(vertex)). + withValue(GraphTask.IN_PROCESSORS, inProcessors(vertex))) + }) + } + + private def resolveIds(app: StreamApplication, ids: Map[Module, ID]): + Map[Module, ProcessorId] = { + ids.flatMap { kv => + val (module, id) = kv + val processorId = app.dag.vertices.find { processor => + processor.taskConf.getString(id).isDefined + }.map(_.id) + processorId.map((module, _)) + } + } + + private def removeIds(app: StreamApplication): StreamApplication = { + val graph = app.dag.mapVertex { processor => + val conf = removeId(processor.taskConf) + processor.copy(taskConf = conf) + } + new StreamApplication(app.name, app.inputUserConfig, graph) + } + + private def removeId(conf: UserConfig): UserConfig = { + conf.filter { kv => + kv._2 != RemoteMaterializerImpl.TRACKABLE + } + } + + private def toOpGraph: (Graph[Op, OpEdge], Map[Module, ID]) = { + var matValues = collection.mutable.Map.empty[Module, ID] + val opGraph = graph.mapVertex[Op] { module => + val name = uuid + val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.TRACKABLE) + matValues += module -> name + val parallelism = GearAttributes.count(module.attributes) + val op = module match { + case source: SourceTaskModule[_] => + val updatedConf = conf.withConfig(source.conf) + DataSourceOp(source.source, parallelism, updatedConf, "source") + case sink: SinkTaskModule[_] => + val updatedConf = conf.withConfig(sink.conf) + DataSinkOp(sink.sink, parallelism, updatedConf, "sink") + case sourceBridge: SourceBridgeModule[_, _] => + ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source") + case processor: ProcessorModule[_, _, _] => + val updatedConf = conf.withConfig(processor.conf) + ProcessorOp(processor.processor, parallelism, updatedConf, "source") + case sinkBridge: SinkBridgeModule[_, _] => + ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") + case groupBy: GroupByModule[_, _] => + GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindow.apply(1).accumulating), + parallelism, "groupBy", conf) + case reduce: ReduceModule[_] => + reduceOp(reduce.f, conf) + case graphStage: GraphStageModule => + translateGraphStageWithMaterializedValue(graphStage, parallelism, conf) + case _ => + null + } + if (op == null) { + throw new UnsupportedOperationException( + module.getClass.toString + " is not supported with RemoteMaterializer" + ) + } + op + }.mapEdge[OpEdge] { (n1, edge, n2) => + n2 match { + case chainableOp: ChainableOp[_, _] + if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] => + Direct + case _ => + Shuffle + } + } + (opGraph, matValues.toMap) + } + + private def translateGraphStageWithMaterializedValue(module: GraphStageModule, + parallelism: Int, conf: UserConfig): Op = { + module.stage match { + case tickSource: TickSource[_] => + val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef] + val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, tickSource.initialDelay). + withValue[FiniteDuration](INTERVAL, tickSource.interval). + withValue(TICK, tick) + ProcessorOp(classOf[TickSourceTask[_]], parallelism, tiConf, "tickSource") + case graphStage: GraphStage[_] => + translateGraphStage(module, parallelism, conf) + case headOptionStage: HeadOptionStage[_] => + headOptionOp(headOptionStage, conf) + case pushPullGraphStageWithMaterializedValue: + PushPullGraphStageWithMaterializedValue[_, _, _, _] => + translateSymbolic(pushPullGraphStageWithMaterializedValue, conf) + } + } + + private def translateGraphStage(module: GraphStageModule, + parallelism: Int, conf: UserConfig): Op = { + module.stage match { + case balance: Balance[_] => + ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance") + case batch: Batch[_, _] => + val batchConf = conf.withValue[_ => Long](BatchTask.COST, batch.costFn). + withLong(BatchTask.MAX, batch.max). + withValue[(_, _) => _](BatchTask.AGGREGATE, batch.aggregate). + withValue[_ => _](BatchTask.SEED, batch.seed) + ProcessorOp(classOf[BatchTask[_, _]], + parallelism, batchConf, "batch") + case broadcast: Broadcast[_] => + val name = ModuleExtractor.unapply(broadcast).map(_.attributes.nameOrDefault()).get + ProcessorOp(classOf[BroadcastTask], parallelism, conf, name) + case collect: Collect[_, _] => + collectOp(collect.pf, conf) + case concat: Concat[_] => + ProcessorOp(classOf[ConcatTask], parallelism, conf, "concat") + case delayInitial: DelayInitial[_] => + val dIConf = conf.withValue[FiniteDuration]( + DelayInitialTask.DELAY_INITIAL, delayInitial.delay) + ProcessorOp(classOf[DelayInitialTask[_]], parallelism, dIConf, "delayInitial") + case dropWhile: DropWhile[_] => + dropWhileOp(dropWhile.p, conf) + case flattenMerge: FlattenMerge[_, _] => + ProcessorOp(classOf[FlattenMergeTask], parallelism, conf, "flattenMerge") + case fold: Fold[_, _] => + val foldConf = conf.withValue(FoldTask.ZERO, fold.zero.asInstanceOf[AnyRef]). + withValue(FoldTask.AGGREGATOR, fold.f) + ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold") + case groupBy: GroupBy[_, _] => + GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating), + groupBy.maxSubstreams, "groupBy", conf) + case groupedWithin: GroupedWithin[_] => + val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d). + withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n) + ProcessorOp(classOf[GroupedWithinTask[_]], parallelism, diConf, "groupedWithin") + case idleInject: IdleInject[_, _] => + // TODO + null + case idleTimeoutBidi: IdleTimeoutBidi[_, _] => + // TODO + null + case incomingConnectionStage: IncomingConnectionStage => + // TODO + null + case interleave: Interleave[_] => + val ilConf = conf.withInt(InterleaveTask.INPUT_PORTS, interleave.inputPorts). + withInt(InterleaveTask.SEGMENT_SIZE, interleave.segmentSize) + ProcessorOp(classOf[InterleaveTask], parallelism, ilConf, "interleave") + null + case intersperse: Intersperse[_] => + // TODO + null + case limitWeighted: LimitWeighted[_] => + // TODO + null + case map: FMap[_, _] => + mapOp(map.f, conf) + case mapAsync: MapAsync[_, _] => + ProcessorOp(classOf[MapAsyncTask[_, _]], + mapAsync.parallelism, conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsync.f), "mapAsync") + case mapAsyncUnordered: MapAsyncUnordered[_, _] => + ProcessorOp(classOf[MapAsyncTask[_, _]], + mapAsyncUnordered.parallelism, + conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsyncUnordered.f), "mapAsyncUnordered") + case materializedValueSource: MaterializedValueSource[_] => + // TODO + null + case merge: Merge[_] => + val mergeConf = conf.withBoolean(MergeTask.EAGER_COMPLETE, merge.eagerComplete). + withInt(MergeTask.INPUT_PORTS, merge.inputPorts) + ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge") + case mergePreferred: MergePreferred[_] => + MergeOp("mergePreferred", conf) + case mergeSorted: MergeSorted[_] => + MergeOp("mergeSorted", conf) + case prefixAndTail: PrefixAndTail[_] => + // TODO + null + case recover: Recover[_] => + // TODO + null + case scan: Scan[_, _] => + scanOp(scan.zero, scan.f, conf) + case simpleLinearGraphStage: SimpleLinearGraphStage[_] => + translateSimpleLinearGraph(simpleLinearGraphStage, parallelism, conf) + case singleSource: SingleSource[_] => + val singleSourceConf = conf.withValue[AnyRef](SingleSourceTask.ELEMENT, + singleSource.elem.asInstanceOf[AnyRef]) + ProcessorOp(classOf[SingleSourceTask[_]], parallelism, singleSourceConf, "singleSource") + case split: Split[_] => + // TODO + null + case statefulMapConcat: StatefulMapConcat[_, _] => + val func = statefulMapConcat.f + val statefulMapConf = + conf.withValue[() => _ => Iterable[_]](StatefulMapConcatTask.FUNC, func) + ProcessorOp(classOf[StatefulMapConcatTask[_, _]], parallelism, + statefulMapConf, "statefulMapConcat") + case subSink: SubSink[_] => + // TODO + null + case subSource: SubSource[_] => + // TODO + null + case unfold: Unfold[_, _] => + // TODO + null + case unfoldAsync: UnfoldAsync[_, _] => + // TODO + null + case unzip: Unzip[_, _] => + // ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism, + // conf.withValue( + // Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip") + // TODO + null + case zip: Zip[_, _] => + zipWithOp(zip.zipper, conf) + case zipWith2: ZipWith2[_, _, _] => + ProcessorOp(classOf[Zip2Task[_, _, _]], + parallelism, + conf.withValue( + Zip2Task.ZIP2_FUNCTION, Zip2Task.ZipFunction(zipWith2.zipper) + ), "zipWith2") + } + } + + private def translateSimpleLinearGraph(stage: SimpleLinearGraphStage[_], + parallelism: Int, conf: UserConfig): Op = { + stage match { + case completion: Completion[_] => + // TODO + null + case delay: Delay[_] => + // TODO + null + case drop: Drop[_] => + dropOp(drop.count, conf) + case dropWithin: DropWithin[_] => + val dropWithinConf = + conf.withValue[FiniteDuration](DropWithinTask.TIMEOUT, dropWithin.timeout) + ProcessorOp(classOf[DropWithinTask[_]], + parallelism, dropWithinConf, "dropWithin") + case filter: Filter[_] => + filterOp(filter.p, conf) + case idle: Idle[_] => + // TODO + null + case initial: Initial[_] => + // TODO + null + case log: Log[_] => + logOp(log.name, log.extract, conf) + case reduce: Reduce[_] => + reduceOp(reduce.f, conf) + case take: Take[_] => + takeOp(take.count, conf) + case takeWhile: TakeWhile[_] => + filterOp(takeWhile.p, conf) + case takeWithin: TakeWithin[_] => + val takeWithinConf = + conf.withValue[FiniteDuration](TakeWithinTask.TIMEOUT, takeWithin.timeout) + ProcessorOp(classOf[TakeWithinTask[_]], + parallelism, takeWithinConf, "takeWithin") + case throttle: Throttle[_] => + val throttleConf = conf.withInt(ThrottleTask.COST, throttle.cost). + withInt(ThrottleTask.MAX_BURST, throttle.maximumBurst). + withValue[_ => Int](ThrottleTask.COST_CALC, throttle.costCalculation). + withValue[FiniteDuration](ThrottleTask.TIME_PERIOD, throttle.per) + ProcessorOp(classOf[ThrottleTask[_]], + parallelism, throttleConf, "throttle") + } + } + + private def translateSymbolic(stage: PushPullGraphStageWithMaterializedValue[_, _, _, _], + conf: UserConfig): Op = { + stage match { + case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _] + if symbolicGraphStage.symbolicStage.attributes.equals( + Stages.DefaultAttributes.buffer) => { + // ignore the buffering operation + identity("buffer", conf) + } + } + } + +} + +object RemoteMaterializerImpl { + final val NotApplied: Any => Any = _ => NotApplied + + def collectOp[In, Out](collect: PartialFunction[In, Out], conf: UserConfig): Op = { + flatMapOp({ data: In => + collect.applyOrElse(data, NotApplied) match { + case NotApplied => None + case result: Any => Option(result) + } + }, "collect", conf) + } + + def filterOp[In](filter: In => Boolean, conf: UserConfig): Op = { + flatMapOp({ data: In => + if (filter(data)) Option(data) else None + }, "filter", conf) + } + + def headOptionOp[T](headOptionStage: HeadOptionStage[T], conf: UserConfig): Op = { + val promise: Promise[Option[T]] = Promise() + flatMapOp({ data: T => + data match { + case None => + Some(promise.future.failed) + case Some(d) => + promise.future.value + } + }, "headOption", conf) + } + + def reduceOp[T](reduce: (T, T) => T, conf: UserConfig): Op = { + var result: Option[T] = None + val flatMap = { elem: T => + result match { + case None => + result = Some(elem) + case Some(r) => + result = Some(reduce(r, elem)) + } + List(result) + } + flatMapOp(flatMap, "reduce", conf) + } + + def zipWithOp[In1, In2](zipWith: (In1, In2) => (In1, In2), conf: UserConfig): Op = { + val flatMap = { elem: (In1, In2) => + val (e1, e2) = elem + val result: (In1, In2) = zipWith(e1, e2) + List(result) + } + flatMapOp(flatMap, "zipWith", conf) + } + + def zipWithOp2[In1, In2, Out](zipWith: (In1, In2) => Out, conf: UserConfig): Op = { + val flatMap = { elem: (In1, In2) => + val (e1, e2) = elem + val result: Out = zipWith(e1, e2) + List(result) + } + flatMapOp(flatMap, "zipWith", conf) + } + + def identity(description: String, conf: UserConfig): Op = { + flatMapOp({ data: Any => + List(data) + }, description, conf) + } + + def mapOp[In, Out](map: In => Out, conf: UserConfig): Op = { + val flatMap = (data: In) => List(map(data)) + flatMapOp (flatMap, conf) + } + + def flatMapOp[In, Out](flatMap: In => Iterable[Out], conf: UserConfig): Op = { + flatMapOp(flatMap, "flatmap", conf) + } + + def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String, + conf: UserConfig): Op = { + ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf) + } + + def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out, + conf: UserConfig): Op = { + var agg = None: Option[Out] + val flatMap = {elem: In => + agg = agg match { + case None => + Some(seed(elem)) + case Some(value) => + Some(aggregate(value, elem)) + } + List(agg.get) + } + flatMapOp (flatMap, "conflate", conf) + } + + def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = { + var aggregator: Out = zero + val map = { elem: In => + aggregator = fold(aggregator, elem) + List(aggregator) + } + flatMapOp(map, "fold", conf) + } + + def groupedOp(count: Int, conf: UserConfig): Op = { + var left = count + val buf = { + val b = Vector.newBuilder[Any] + b.sizeHint(count) + b + } + + val flatMap: Any => Iterable[Any] = {input: Any => + buf += input + left -= 1 + if (left == 0) { + val emit = buf.result() + buf.clear() + left = count + Some(emit) + } else { + None + } + } + flatMapOp(flatMap, conf: UserConfig) + } + + def dropOp[T](number: Long, conf: UserConfig): Op = { + var left = number + val flatMap: T => Iterable[T] = {input: T => + if (left > 0) { + left -= 1 + None + } else { + Some(input) + } + } + flatMapOp(flatMap, "drop", conf) + } + + def dropWhileOp[In](drop: In => Boolean, conf: UserConfig): Op = { + flatMapOp({ data: In => + if (drop(data)) None else Option(data) + }, "dropWhile", conf) + } + + def logOp[T](name: String, extract: T => Any, conf: UserConfig): Op = { + val flatMap = {elem: T => + LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}") + List(elem) + } + flatMapOp(flatMap, "log", conf) + } + + def scanOp[In, Out](zero: Out, f: (Out, In) => Out, conf: UserConfig): Op = { + var aggregator = zero + var pushedZero = false + + val flatMap = {elem: In => + aggregator = f(aggregator, elem) + + if (pushedZero) { + pushedZero = true + List(zero, aggregator) + } else { + List(aggregator) + } + } + flatMapOp(flatMap, "scan", conf) + } + + def statefulMapOp[In, Out](f: In => Iterable[Out], conf: UserConfig): Op = { + flatMapOp ({ data: In => + f(data) + }, conf) + } + + def takeOp(count: Long, conf: UserConfig): Op = { + var left: Long = count + + val filter: Any => Iterable[Any] = {elem: Any => + left -= 1 + if (left > 0) Some(elem) + else if (left == 0) Some(elem) + else None + } + flatMapOp(filter, "take", conf) + } + + /** + * We use this attribute to track module to Processor + * + */ + val TRACKABLE = "track how module is fused to processor" +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala new file mode 100644 index 0000000..5b8c71b --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream._ +import akka.stream.impl.StreamLayout.{AtomicModule, Module} +import org.reactivestreams.{Publisher, Subscriber} + +/** + * + * + * [[IN]] -> [[BridgeModule]] -> [[OUT]] + * / + * / + * out of band data input or output channel [[MAT]] + * + * + * [[BridgeModule]] is used as a bridge between different materializers. + * Different [[akka.stream.Materializer]]s can use out of band channel to + * exchange messages. + * + * For example: + * + * Remote Materializer + * ----------------------------- + * | | + * | BridgeModule -> RemoteSink | + * | / | + * --/---------------------------- + * Local Materializer / out of band channel. + * ----------------------/---- + * | Local / | + * | Source -> BridgeModule | + * | | + * --------------------------- + * + * + * Typically [[BridgeModule]] is created implicitly as a temporary intermediate + * module during materialization. + * + * However, user can still declare it explicitly. In this case, it means we have a + * boundary Source or Sink module which accept out of band channel inputs or + * outputs. + * + * @tparam IN input + * @tparam OUT output + * @tparam MAT materializer + */ +abstract class BridgeModule[IN, OUT, MAT] extends AtomicModule { + val inPort = Inlet[IN]("BridgeModule.in") + val outPort = Outlet[OUT]("BridgeModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + def attributes: Attributes + def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT] + + protected def newInstance: BridgeModule[IN, OUT, MAT] + override def carbonCopy: Module = newInstance +} + + +/** + * + * Bridge module which accept out of band channel Input + * [[org.reactivestreams.Publisher]][IN]. + * + * + * [[SourceBridgeModule]] -> [[OUT]] + * /| + * / + * out of band data input [[org.reactivestreams.Publisher]][IN] + * + * @see [[BridgeModule]] + * @param attributes Attributes + * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]] + * @tparam OUT out put data type to next module. + */ +class SourceBridgeModule[IN, OUT](val attributes: Attributes = + Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] { + override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = + new SourceBridgeModule[IN, OUT](attributes) + + override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = { + new SourceBridgeModule( attributes) + } +} + +/** + * + * Bridge module which accept out of band channel Output + * [[org.reactivestreams.Subscriber]][OUT]. + * + * + * [[IN]] -> [[BridgeModule]] + * \ + * \ + * \| + * out of band data output [[org.reactivestreams.Subscriber]][OUT] + * + * @see [[BridgeModule]] + * @param attributes Attributes + * @tparam IN, input data type from previous module + * @tparam OUT out put data type to out of band subscriber + */ +class SinkBridgeModule[IN, OUT](val attributes: Attributes = + Attributes.name("sinkBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] { + override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = + new SinkBridgeModule[IN, OUT](attributes) + + override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = { + new SinkBridgeModule[IN, OUT](attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala new file mode 100644 index 0000000..ea76bb0 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream.impl.StreamLayout.{AtomicModule, Module} +import akka.stream.impl.{SinkModule, SourceModule} +import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape} +import org.reactivestreams.{Publisher, Subscriber} + +/** + * [[DummyModule]] is a set of special module to help construct a RunnableGraph, + * so that all ports are closed. + * + * In runtime, [[DummyModule]] should be ignored during materialization. + * + * For example, if you have a [[BridgeModule]] which only accept the input + * message from out of band channel, then you can use DummySource to fake + * a Message Source Like this. + * + * [[DummySource]] -> [[BridgeModule]] -> Sink + * /| + * / + * out of band input message [[Publisher]] + * + * After materialization, [[DummySource]] will be removed. + + * [[BridgeModule]] -> Sink + * /| + * / + * [[akka.stream.impl.PublisherSource]] + * + * + */ +trait DummyModule extends AtomicModule + + +/** + * + * [[DummySource]]-> [[BridgeModule]] -> Sink + * /| + * / + * out of band input message Source + * + * @param attributes Attributes + * @param shape SourceShape[Out] + * @tparam Out Output + */ +class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out]) + extends SourceModule[Out, Unit](shape) with DummyModule { + + override def create(context: MaterializationContext): (Publisher[Out], Unit) = { + throw new UnsupportedOperationException() + } + + override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = { + new DummySource[Out](attributes, shape) + } + + override def withAttributes(attr: Attributes): Module = { + new DummySource(attr, amendShape(attr)) + } +} + + +/** + * + * Source-> [[BridgeModule]] -> [[DummySink]] + * \ + * \ + * \| + * out of band output message [[Subscriber]] + * + * @param attributes Attributes + * @param shape SinkShape[IN] + */ +class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN]) + extends SinkModule[IN, Unit](shape) with DummyModule { + override def create(context: MaterializationContext): (Subscriber[IN], Unit) = { + throw new UnsupportedOperationException() + } + + override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, Unit] = { + new DummySink[IN](attributes, shape) + } + + override def withAttributes(attr: Attributes): Module = { + new DummySink[IN](attr, amendShape(attr)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala new file mode 100644 index 0000000..dfbbee9 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream.impl.StreamLayout.{AtomicModule, Module} +import akka.stream._ +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task + +/** + * [[GearpumpTaskModule]] represent modules that can be materialized as Gearpump Tasks. + * + * This is specially designed for Gearpump runtime. It is not supposed to be used + * for local materializer. + * + */ +trait GearpumpTaskModule extends AtomicModule + +/** + * This is used to represent the Gearpump Data Source + * @param source DataSource + * @param conf UserConfig + * @param shape SourceShape[T} + * @param attributes Attributes + * @tparam T type + */ +final case class SourceTaskModule[T]( + source: DataSource, + conf: UserConfig, + shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")), + attributes: Attributes = Attributes.name("SourceTaskModule")) + extends GearpumpTaskModule { + + override def withAttributes(attr: Attributes): Module = + this.copy(shape = amendShape(attr), attributes = attr) + override def carbonCopy: Module = + this.copy(shape = SourceShape( Outlet[T]("SourceTaskModule.out"))) + + override def replaceShape(s: Shape): Module = + if (s == shape) this + else throw new UnsupportedOperationException("cannot replace the shape of SourceTaskModule") + + private def amendShape(attr: Attributes): SourceShape[T] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(out = Outlet(thatN + ".out")) + } +} + +/** + * This is used to represent the Gearpump Data Sink + * @param sink DataSink + * @param conf UserConfig + * @param shape SinkShape[IN] + * @param attributes Attributes + * @tparam IN type + */ +final case class SinkTaskModule[IN]( + sink: DataSink, + conf: UserConfig, + shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")), + attributes: Attributes = Attributes.name("SinkTaskModule")) + extends GearpumpTaskModule { + + override def withAttributes(attr: Attributes): Module = + this.copy(shape = amendShape(attr), attributes = attr) + override def carbonCopy: Module = + this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in"))) + + override def replaceShape(s: Shape): Module = + if (s == shape) this + else throw new UnsupportedOperationException("cannot replace the shape of SinkTaskModule") + + private def amendShape(attr: Attributes): SinkShape[IN] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(in = Inlet(thatN + ".out")) + } +} + +/** + * This is to represent the Gearpump Processor which has exact one input and one output + * @param processor Class[_ <: Task] + * @param conf UserConfig + * @param attributes Attributes + * @tparam IN type + * @tparam OUT type + * @tparam Unit void + */ +case class ProcessorModule[IN, OUT, Unit]( + processor: Class[_ <: Task], + conf: UserConfig, + attributes: Attributes = Attributes.name("processorModule")) + extends AtomicModule with GearpumpTaskModule { + val inPort = Inlet[IN]("ProcessorModule.in") + val outPort = Outlet[IN]("ProcessorModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + override def carbonCopy: Module = newInstance + + protected def newInstance: ProcessorModule[IN, OUT, Unit] = + new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) + + override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = { + new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala new file mode 100644 index 0000000..b06dd0e --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream._ +import akka.stream.impl.StreamLayout.{AtomicModule, Module} + + +/** + * + * Group the T value groupBy function + * + * @param groupBy T => Group + * @param attributes Attributes + * @tparam T type + * @tparam Group type + */ +case class GroupByModule[T, Group](groupBy: T => Group, + attributes: Attributes = Attributes.name("groupByModule")) + extends AtomicModule { + val inPort = Inlet[T]("GroupByModule.in") + val outPort = Outlet[T]("GroupByModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + override def carbonCopy: Module = newInstance + + protected def newInstance: GroupByModule[T, Group] = + new GroupByModule[T, Group](groupBy, attributes) + + override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = { + new GroupByModule[T, Group](groupBy, attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala new file mode 100644 index 0000000..462d967 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream._ +import akka.stream.impl.StreamLayout.{AtomicModule, Module} + + +/** + * + * Reduce Module + * + * @param f (T,T) => T + * @param attributes Attributes + * @tparam T type + */ +case class ReduceModule[T](f: (T, T) => T, attributes: Attributes = +Attributes.name("reduceModule")) extends AtomicModule { + val inPort = Inlet[T]("GroupByModule.in") + val outPort = Outlet[T]("GroupByModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + override def carbonCopy: Module = newInstance + + protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, attributes) + + override def withAttributes(attributes: Attributes): ReduceModule[T] = { + new ReduceModule[T](f, attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala new file mode 100644 index 0000000..8e43c16 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.scaladsl + +import akka.stream.Attributes +import org.apache.gearpump.akkastream.module._ +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task +import org.reactivestreams.{Publisher, Subscriber} + + +object GearSource{ + + /** + * Construct a Source which accepts out of band input messages. + * + * [[SourceBridgeModule]] -> Sink + * / + * / + * V + * materialize to [[Subscriber]] + * /| + * / + * upstream [[Publisher]] send out of band message + * + */ + def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = { + val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy"))) + val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]()) + source.viaMat(flow)(Keep.right) + } + + /** + * Construct a Source from Gearpump [[DataSource]]. + * + * [[SourceTaskModule]] -> downstream Sink + * + */ + def from[OUT](source: DataSource): Source[OUT, Unit] = { + val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty)) + taskSource + } + + /** + * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]]. + * + * [[ProcessorModule]] -> downstream Sink + * + */ + def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = { + val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy"))) + val flow = Processor.apply[Unit, OUT](processor, conf) + source.viaMat(flow)(Keep.right) + } +} + +object GearSink { + + /** + * Construct a Sink which output messages to a out of band channel. + * + * Souce -> [[SinkBridgeModule]] + * \ + * \| + * materialize to [[Publisher]] + * \ + * \ + * \| + * send out of band message to downstream [[Subscriber]] + * + */ + def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = { + val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy"))) + val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]()) + flow.to(sink) + } + + /** + * Construct a Sink from Gearpump [[DataSink]]. + * + * Upstream Source -> [[SinkTaskModule]] + * + */ + def to[IN](sink: DataSink): Sink[IN, Unit] = { + val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty)) + taskSink + } + + /** + * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]]. + * + * Upstream Source -> [[ProcessorModule]] + * + */ + def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = { + val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy"))) + val flow = Processor.apply[IN, Unit](processor, conf) + flow.to(sink) + } +} + +/** + * + * GroupBy will divide the main input stream to a set of sub-streams. + * This is a work-around to bypass the limitation of official API Flow.groupBy + * + * + * For example, to do a word count, we can write code like this: + * + * case class KV(key: String, value: String) + * case class Count(key: String, count: Int) + * + * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv => + * Count(kv.key, 1) + * }.fold(Count(null, 0)) {(base, add) => + * Count(add.key, base.count + add.count) + * }.log("count of current key") + * .flatten() + * .to(sink) + * + * map, fold will transform data on all sub-streams, If there are 10 groups, + * then there will be 10 sub-streams, and for each sub-stream, there will be + * a map and fold. + * + * flatten will collect all sub-stream into the main stream, + * + * sink will only operate on the main stream. + * + */ +object GroupBy{ + def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = { + new Flow[T, T, Unit](new GroupByModule(groupBy)) + } +} + +/** + * Aggregate on the data. + * + * val flow = Reduce({(a: Int, b: Int) => a + b}) + * + * + */ +object Reduce{ + def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = { + new Flow[T, T, Unit](new ReduceModule(reduce)) + } +} + + +/** + * Create a Flow by providing a Gearpump Processor class and configuration + * + * + */ +object Processor{ + def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = { + new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf)) + } +} + +object Implicits { + + /** + * Help util to support reduce and groupBy + */ + implicit class SourceOps[T, Mat](source: Source[T, Mat]) { + + // TODO It is named as groupBy2 to avoid conflict with built-in + // groupBy. Eventually, we think the built-in groupBy should + // be replace with this implementation. + def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = { + val stage = GroupBy.apply(groupBy) + source.via[T, Unit](stage) + } + + + def reduce(reduce: (T, T) => T): Source[T, Mat] = { + val stage = Reduce.apply(reduce) + source.via[T, Unit](stage) + } + + def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = { + val stage = Processor.apply[T, R](processor, conf) + source.via(stage) + } + } + + /** + * Help util to support reduce and groupBy + */ + implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) { + def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = { + val stage = GroupBy.apply(groupBy) + flow.via(stage) + } + + def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = { + val stage = Reduce.apply(reduce) + flow.via(stage) + } + + def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = { + val stage = Processor.apply[OUT, R](processor, conf) + flow.via(stage) + } + } + + /** + * Help util to support groupByKey and sum + */ + implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) { + + /** + * if it is a KV Pair, we can group the KV pair by the key. + * @return + */ + def groupByKey: Source[(K, V), Mat] = { + val stage = GroupBy.apply(getTupleKey[K, V]) + source.via(stage) + } + + /** + * do sum on values + * + * Before doing this, you need to do groupByKey to group same key together + * , otherwise, it will do the sum no matter what current key is. + * + * @param numeric Numeric[V] + * @return + */ + def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = { + val stage = Reduce.apply(sumByKey[K, V](numeric)) + source.via(stage) + } + } + + /** + * Help util to support groupByKey and sum + */ + implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) { + + /** + * if it is a KV Pair, we can group the KV pair by the key. + * @return + */ + def groupByKey: Flow[(K, V), (K, V), Mat] = { + val stage = GroupBy.apply(getTupleKey[K, V]) + flow.via(stage) + } + + /** + * do sum on values + * + * Before doing this, you need to do groupByKey to group same key together + * , otherwise, it will do the sum no matter what current key is. + * + * @param numeric Numeric[V] + * @return + */ + def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = { + val stage = Reduce.apply(sumByKey[K, V](numeric)) + flow.via(stage) + } + } + + private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 + + private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = + (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala new file mode 100644 index 0000000..43f07c4 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class BalanceTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfOutputs = sizeOfOutPorts + var index = 0 + + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfOutputs) { + index = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala new file mode 100644 index 0000000..5c2485b --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +class BatchTask[In, Out](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val max = userConf.getLong(BatchTask.MAX) + val costFunc = userConf.getValue[In => Long](BatchTask.COST) + val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE) + val seed = userConf.getValue[In => Out](BatchTask.SEED) + + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[In] + val time = msg.timestamp + context.output(msg) + } +} + +object BatchTask { + val AGGREGATE = "AGGREGATE" + val COST = "COST" + val MAX = "MAX" + val SEED = "SEED" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala new file mode 100644 index 0000000..292468d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class BroadcastTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + override def onNext(msg : Message) : Unit = { + context.output(msg) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala new file mode 100644 index 0000000..b77b9bd --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class ConcatTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfOutputs = sizeOfOutPorts + var index = 0 + + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfOutputs) { + index = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala new file mode 100644 index 0000000..7c335dc --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +case object DelayInitialTime + +class DelayInitialTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val delayInitial = userConf.getValue[FiniteDuration](DelayInitialTask.DELAY_INITIAL). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + var delayInitialActive = true + + override def onStart(startTime: Instant): Unit = { + context.scheduleOnce(delayInitial)( + self ! Message(DelayInitialTime, System.currentTimeMillis()) + ) + } + override def onNext(msg : Message) : Unit = { + msg.msg match { + case DelayInitialTime => + delayInitialActive = false + case _ => + delayInitialActive match { + case true => + case false => + context.output(msg) + } + } + } +} + +object DelayInitialTask { + val DELAY_INITIAL = "DELAY_INITIAL" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala new file mode 100644 index 0000000..0c54829 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +case object DropWithinTimeout + +class DropWithinTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val timeout = userConf.getValue[FiniteDuration](DropWithinTask.TIMEOUT). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + var timeoutActive = true + + override def onStart(startTime: Instant): Unit = { + context.scheduleOnce(timeout)( + self ! Message(DropWithinTimeout, System.currentTimeMillis()) + ) + } + + override def onNext(msg : Message) : Unit = { + msg.msg match { + case DropWithinTimeout => + timeoutActive = false + case _ => + + } + timeoutActive match { + case true => + case false => + context.output(msg) + } + } +} + +object DropWithinTask { + val TIMEOUT = "TIMEOUT" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala new file mode 100644 index 0000000..14ff537 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class FlattenMergeTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfOutputs = sizeOfOutPorts + var index = 0 + + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfOutputs) { + index = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala new file mode 100644 index 0000000..d982ebd --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class FoldTask[In, Out](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val zero = userConf.getValue[Out](FoldTask.ZERO) + val aggregator = userConf.getValue[(Out, In) => Out](FoldTask.AGGREGATOR) + var aggregated: Out = _ + implicit val ec = context.system.dispatcher + + override def onStart(instant: Instant): Unit = { + zero.foreach(value => { + aggregated = value + }) + } + + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[In] + val time = msg.timestamp + aggregator.foreach(func => { + aggregated = func(aggregated, data) + LOG.info(s"aggregated = $aggregated") + val msg = new Message(aggregated, time) + context.output(msg) + }) + } +} + +object FoldTask { + val ZERO = "ZERO" + val AGGREGATOR = "AGGREGATOR" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala new file mode 100644 index 0000000..3310ab9 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.akkastream.task.GraphTask.{Index, PortId} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskWrapper} + +class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig) + extends Task(inputTaskContext, userConf) { + + private val context = inputTaskContext.asInstanceOf[TaskWrapper] + protected val outMapping = + portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get) + protected val inMapping = + portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get) + + val sizeOfOutPorts = outMapping.keys.size + val sizeOfInPorts = inMapping.keys.size + + private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = { + val portToProcessor = processors.zipWithIndex.map{kv => + (kv._2, kv._1) + }.toMap + + val processorToIndex = processors.sorted.zipWithIndex.toMap + + val portToIndex = portToProcessor.map{kv => + val (outlet, processorId) = kv + val index = processorToIndex(processorId) + (outlet, index) + } + portToIndex + } + + def output(outletId: Int, msg: Message): Unit = { + context.output(outMapping(outletId), msg) + } + + override def onStart(startTime : Instant) : Unit = {} + + override def onStop() : Unit = {} +} + +object GraphTask { + val OUT_PROCESSORS = "org.apache.gearpump.akkastream.task.outprocessors" + val IN_PROCESSORS = "org.apache.gearpump.akkastream.task.inprocessors" + + type PortId = Int + type Index = Int +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala new file mode 100644 index 0000000..eaf2b3f --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.collection.immutable.VectorBuilder +import scala.concurrent.duration.FiniteDuration + +class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + case object GroupedWithinTrigger + val buf: VectorBuilder[T] = new VectorBuilder + val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW) + val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE) + + override def onNext(msg : Message) : Unit = { + + } +} + +object GroupedWithinTask { + val BATCH_SIZE = "BATCH_SIZE" + val TIME_WINDOW = "TIME_WINDOW" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala new file mode 100644 index 0000000..741ec43 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class InterleaveTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfInputs = sizeOfInPorts + var index = 0 + + // TODO access upstream and pull + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfInputs) { + index = 0 + } + } +} + +object InterleaveTask { + val INPUT_PORTS = "INPUT_PORTS" + val SEGMENT_SIZE = "SEGMENT_SIZE" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala new file mode 100644 index 0000000..daa1afc --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future + +class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC) + implicit val ec = context.system.dispatcher + + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[In] + val time = msg.timestamp + f match { + case Some(func) => + val fout = func(data) + fout.onComplete(value => { + value.foreach(out => { + val msg = new Message(out, time) + context.output(msg) + }) + }) + case None => + } + } +} + +object MapAsyncTask { + val MAPASYNC_FUNC = "MAPASYNC_FUNC" + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala new file mode 100644 index 0000000..ad18f72 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class MergeTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE) + val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS) + + override def onNext(msg : Message) : Unit = { + context.output(msg) + } +} + +object MergeTask { + val EAGER_COMPLETE = "EAGER_COMPLETE" + val INPUT_PORTS = "INPUT_PORTS" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala new file mode 100644 index 0000000..458bb4e --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +class SingleSourceTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get + + override def onNext(msg : Message) : Unit = { + context.output(Message(elem, msg.timestamp)) + } +} + +object SingleSourceTask { + val ELEMENT = "ELEMENT" +}
