http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala deleted file mode 100644 index 47ed1f2..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.materializer - -import akka.actor.ActorSystem -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.GearAttributes -import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} -import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task} -import akka.stream.impl.Stages -import akka.stream.impl.Stages.StageModule -import akka.stream.impl.StreamLayout.Module -import org.slf4j.LoggerFactory - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.StreamApp -import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp} -import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} -import org.apache.gearpump.util.Graph - -/** - * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump - * Streaming Application. - * - * @param graph - * @param system - */ -class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { - - import RemoteMaterializerImpl._ - - type Clue = String - private implicit val actorSystem = system - - private def uuid: String = { - java.util.UUID.randomUUID.toString - } - - /** - * @return a mapping from Module to Materialized Processor Id. - */ - def materialize: (StreamApplication, Map[Module, ProcessorId]) = { - val (opGraph, clues) = toOpGraph() - val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph) - val processorIds = resolveClues(app, clues) - - val updatedApp = updateJunctionConfig(processorIds, app) - (cleanClues(updatedApp), processorIds) - } - - private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], app: StreamApplication): StreamApplication = { - val config = junctionConfig(processorIds) - - val dag = app.dag.mapVertex { vertex => - val processorId = vertex.id - val newConf = vertex.taskConf.withConfig(config(processorId)) - vertex.copy(taskConf = newConf) - } - new StreamApplication(app.name, app.inputUserConfig, dag) - } - - /** - * Update junction config so that each GraphTask know its upstream and downstream. - * @param processorIds - * @return - */ - private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = { - val updatedConfigs = graph.vertices.map { vertex => - val processorId = processorIds(vertex) - vertex match { - case junction: JunctionModule => - val inProcessors = junction.shape.inlets.map { inlet => - val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to == inlet).map(_._1) - val upstreamProcessorId = processorIds(upstreamModule.get) - upstreamProcessorId - }.toList - - val outProcessors = junction.shape.outlets.map { outlet => - val downstreamModule = graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3) - val downstreamProcessorId = downstreamModule.map(processorIds(_)) - downstreamProcessorId.get - }.toList - - (processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, outProcessors) - .withValue(GraphTask.IN_PROCESSORS, inProcessors)) - case _ => - (processorId, UserConfig.empty) - } - }.toMap - updatedConfigs - } - - private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): Map[Module, ProcessorId] = { - clues.flatMap { kv => - val (module, clue) = kv - val processorId = app.dag.vertices.find { processor => - processor.taskConf.getString(clue).isDefined - }.map(_.id) - processorId.map((module, _)) - } - } - - private def cleanClues(app: StreamApplication): StreamApplication = { - val graph = app.dag.mapVertex { processor => - val conf = cleanClue(processor.taskConf) - processor.copy(taskConf = conf) - } - new StreamApplication(app.name, app.inputUserConfig, graph) - } - - private def cleanClue(conf: UserConfig): UserConfig = { - conf.filter { kv => - kv._2 != RemoteMaterializerImpl.STAINS - } - } - - private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = { - var matValues = Map.empty[Module, Clue] - val opGraph = graph.mapVertex{ module => - val name = uuid - val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.STAINS) - matValues += module -> name - val parallelism = GearAttributes.count(module.attributes) - val op = module match { - case source: SourceTaskModule[t] => - val updatedConf = conf.withConfig(source.conf) - new DataSourceOp[t](source.source, parallelism, updatedConf, "source") - case sink: SinkTaskModule[t] => - val updatedConf = conf.withConfig(sink.conf) - new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink") - case sourceBridge: SourceBridgeModule[_, _] => - new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source") - case processor: ProcessorModule[_, _, _] => - val updatedConf = conf.withConfig(processor.conf) - new ProcessorOp(processor.processor, parallelism, updatedConf, "source") - case sinkBridge: SinkBridgeModule[_, _] => - new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") - case groupBy: GroupByModule[t, g] => - new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf) - case reduce: ReduceModule[Any] => - reduceOp(reduce.f, conf) - case stage: StageModule => - translateStage(stage, conf) - case fanIn: FanInModule => - translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, conf) - case fanOut: FanOutModule => - translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, conf) - } - - if (op == null) { - throw new UnsupportedOperationException(module.getClass.toString + " is not supported with RemoteMaterializer") - } - op - }.mapEdge[OpEdge]{(n1, edge, n2) => - n2 match { - case master: MasterOp => - Shuffle - case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] => - Shuffle - case slave: SlaveOp[_] => - Direct - } - } - (opGraph, matValues) - } - - private def translateStage(module: StageModule, conf: UserConfig): Op = { - module match { - case buffer: Stages.Buffer => - //ignore the buffering operation - identity("buffer", conf) - case collect: Stages.Collect => - collectOp(collect.pf, conf) - case concatAll: Stages.ConcatAll => - //TODO: - null - case conflat: Stages.Conflate => - conflatOp(conflat.seed, conflat.aggregate, conf) - case drop: Stages.Drop => - dropOp(drop.n, conf) - case dropWhile: Stages.DropWhile => - dropWhileOp(dropWhile.p, conf) - case expand: Stages.Expand => - //TODO - null - case filter: Stages.Filter => - filterOp(filter.p, conf) - case fold: Stages.Fold => - foldOp(fold.zero, fold.f, conf) - case groupBy: Stages.GroupBy => - //TODO - null - case grouped: Stages.Grouped => - groupedOp(grouped.n, conf) - case _: Stages.Identity => - identity("identity", conf) - case log: Stages.Log => - logOp(log.name, log.extract, conf) - case map: Stages.Map => - mapOp(map.f, conf) - case mapAsync: Stages.MapAsync => - //TODO - null - case mapAsync: Stages.MapAsyncUnordered => - //TODO - null - case flatMap: Stages.MapConcat => - flatMapOp(flatMap.f, "mapConcat", conf) - case stage: MaterializingStageFactory => - //TODO - null - case prefixAndTail: Stages.PrefixAndTail => - //TODO - null - case recover: Stages.Recover => - //TODO: we will just ignore this - identity("recover", conf) - case scan: Stages.Scan => - scanOp(scan.zero, scan.f, conf) - case split: Stages.Split => - //TODO - null - case stage: Stages.StageFactory => - //TODO - null - case take: Stages.Take => - takeOp(take.n, conf) - case takeWhile: Stages.TakeWhile => - filterOp(takeWhile.p, conf) - case time: Stages.TimerTransform => - //TODO - null - } - } - - private def translateFanIn( - fanIn: FanInModule, - edges: List[(Module, Edge, Module)], - parallelism: Int, - conf: UserConfig): Op = { - fanIn match { - case merge: MergeModule[_] => - MergeOp("merge", conf) - case mergePrefered: MergePreferredModule[_] => - //TODO, support "prefer" merge - MergeOp("mergePrefered", conf) - case zip: ZipWithModule => - //TODO: support zip module - null - case concat: ConcatModule[_] => - //TODO: support concat module - null - case flexiMerge: FlexiMergeModule[_, _] => - //TODO: Suport flexi merge module - null - } - } - - private def translateFanOut( - fanOut: FanOutModule, - edges: List[(Module, Edge, Module)], - parallelism: Int, - conf: UserConfig): Op = { - fanOut match { - case unzip2: UnzipWith2Module[Any, Any, Any] => - val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f)) - new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip") - case broadcast: BroadcastModule[_] => - new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast") - case broadcast: BalanceModule[_] => - new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance") - case flexi: FlexiRouteImpl[_, _] => - //TODO - null - } - } -} - -object RemoteMaterializerImpl { - final val NotApplied: Any => Any = _ => NotApplied - - def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = { - flatMapOp({ data => - collect.applyOrElse(data, NotApplied) match { - case NotApplied => None - case result: Any => Option(result) - } - }, "collect", conf) - } - - def filterOp(filter: Any => Boolean, conf: UserConfig): Op = { - flatMapOp({ data => - if (filter(data)) Option(data) else None - }, "filter", conf) - } - - def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = { - var result: Any = null - val flatMap = { elem: Any => - if (result == null) { - result = elem - } else { - result = reduce(result, elem) - } - List(result) - } - flatMapOp(flatMap, "reduce", conf) - } - - def identity(description: String, conf: UserConfig): Op = { - flatMapOp({ data => - List(data) - }, description, conf) - } - - def mapOp(map: Any => Any, conf: UserConfig): Op = { - flatMapOp({ data: Any => - List(map(data)) - }, "map", conf) - } - - def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = { - flatMapOp(flatMap, "flatmap", conf) - } - - def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: UserConfig): Op = { - FlatMapOp(fun, description, conf) - } - - def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = { - var agg: Any = null - val flatMap = { elem: Any => - agg = if (agg == null) { - seed(elem) - } else { - aggregate(agg, elem) - } - List(agg) - } - - flatMapOp(flatMap, "map", conf) - } - - def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = { - var aggregator: Any = zero - val map = { elem: Any => - aggregator = fold(aggregator, elem) - List(aggregator) - } - flatMapOp(map, "fold", conf) - } - - def groupedOp(count: Int, conf: UserConfig): Op = { - var left = count - val buf = { - val b = Vector.newBuilder[Any] - b.sizeHint(count) - b - } - - val flatMap: Any => Iterable[Any] = { input: Any => - buf += input - left -= 1 - if (left == 0) { - val emit = buf.result() - buf.clear() - left = count - Some(emit) - } else { - None - } - } - flatMapOp(flatMap, conf: UserConfig) - } - - def dropOp(number: Long, conf: UserConfig): Op = { - var left = number - val flatMap: Any => Iterable[Any] = { input: Any => - if (left > 0) { - left -= 1 - None - } else { - Some(input) - } - } - flatMapOp(flatMap, "drop", conf) - } - - def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = { - flatMapOp({ data => - if (drop(data)) None else Option(data) - }, "dropWhile", conf) - } - - def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = { - val flatMap = { elem: Any => - LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}") - List(elem) - } - flatMapOp(flatMap, "log", conf) - } - - def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = { - var aggregator = zero - var pushedZero = false - - val flatMap = { elem: Any => - aggregator = f(aggregator, elem) - - if (pushedZero) { - pushedZero = true - List(zero, aggregator) - } else { - List(aggregator) - } - } - flatMapOp(flatMap, "scan", conf) - } - - def takeOp(count: Long, conf: UserConfig): Op = { - var left: Long = count - - val filter: Any => Iterable[Any] = { elem: Any => - left -= 1 - if (left > 0) Some(elem) - else if (left == 0) Some(elem) - else None - } - flatMapOp(filter, "take", conf) - } - - /** - * We use stains to track how module maps to Processor - * - */ - val STAINS = "track how module is fused to processor" -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala deleted file mode 100644 index c5dfc9a..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.module - -import akka.stream.Attributes -import akka.stream.impl.FlowModule -import akka.stream.impl.StreamLayout.Module -import org.reactivestreams.{Publisher, Subscriber} - -/** - * - * - * [[IN]] -> [[BridgeModule]] -> [[OUT]] - * / - * / - * out of band data input or output channel [[MAT]] - * - * - * [[BridgeModule]] is used as a bridge between different materializers. - * Different [[akka.stream.Materializer]]s can use out of band channel to - * exchange messages. - * - * For example: - * - * Remote Materializer - * ----------------------------- - * | | - * | BridgeModule -> RemoteSink | - * | / | - * --/---------------------------- - * Local Materializer / out of band channel. - * ----------------------/---- - * | Local / | - * | Source -> BridgeModule | - * | | - * --------------------------- - * - * - * Typically [[BridgeModule]] is created implicitly as a temporary intermediate - * module during materialization. - * - * However, user can still declare it explicitly. In this case, it means we have a - * boundary Source or Sink module which accept out of band channel inputs or - * outputs. - * - * - * @tparam IN - * @tparam OUT - * @tparam MAT - */ -abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] { - def attributes: Attributes - def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT] - - protected def newInstance: BridgeModule[IN, OUT, MAT] - override def carbonCopy: Module = newInstance -} - -/** - * - * Bridge module which accept out of band channel Input - * [[org.reactivestreams.Publisher]][IN]. - * - * - * [[SourceBridgeModule]] -> [[OUT]] - * /| - * / - * out of band data input [[org.reactivestreams.Publisher]][IN] - * - * @see [[BridgeModule]] - * - * @param attributes - * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]] - * @tparam OUT out put data type to next module. - */ -class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] { - override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = new SourceBridgeModule[IN, OUT](attributes) - - override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = { - new SourceBridgeModule(attributes) - } -} - -/** - * - * Bridge module which accept out of band channel Output - * [[org.reactivestreams.Subscriber]][OUT]. - * - * - * [[IN]] -> [[BridgeModule]] - * \ - * \ - * \| - * out of band data output [[org.reactivestreams.Subscriber]][OUT] - * - * @see [[BridgeModule]] - * - * @param attributes - * @tparam IN, input data type from previous module - * @tparam OUT out put data type to out of band subscriber - */ -class SinkBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] { - override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = new SinkBridgeModule[IN, OUT](attributes) - - override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = { - new SinkBridgeModule[IN, OUT](attributes) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala deleted file mode 100644 index bc744f9..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.module - -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{SinkModule, SourceModule} -import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape} -import org.reactivestreams.{Publisher, Subscriber} - -/** - * [[DummyModule]] is a set of special module to help construct a RunnableGraph, - * so that all ports are closed. - * - * In runtime, [[DummyModule]] should be ignored during materialization. - * - * For example, if you have a [[BridgeModule]] which only accept the input - * message from out of band channel, then you can use DummySource to fake - * a Message Source Like this. - * - * [[DummySource]] -> [[BridgeModule]] -> Sink - * /| - * / - * out of band input message [[Publisher]] - * - * After materialization, [[DummySource]] will be removed. - - * [[BridgeModule]] -> Sink - * /| - * / - * [[akka.stream.impl.PublisherSource]] - * - * - */ -trait DummyModule extends Module - -/** - * - * [[DummySource]]-> [[BridgeModule]] -> Sink - * /| - * / - * out of band input message Source - * - * @param attributes - * @param shape - * @tparam Out - */ -class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out]) - extends SourceModule[Out, Unit](shape) with DummyModule { - - override def create(context: MaterializationContext): (Publisher[Out], Unit) = { - throw new UnsupportedOperationException() - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = { - new DummySource[Out](attributes, shape) - } - - override def withAttributes(attr: Attributes): Module = { - new DummySource(attr, amendShape(attr)) - } -} - -/** - * - * Source-> [[BridgeModule]] -> [[DummySink]] - * \ - * \ - * \| - * out of band output message [[Subscriber]] - * - * @param attributes - * @param shape - */ -class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN]) - extends SinkModule[IN, Unit](shape) with DummyModule { - override def create(context: MaterializationContext): (Subscriber[IN], Unit) = { - throw new UnsupportedOperationException() - } - - override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, Unit] = { - new DummySink[IN](attributes, shape) - } - - override def withAttributes(attr: Attributes): Module = { - new DummySink[IN](attr, amendShape(attr)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala deleted file mode 100644 index c4c78cc..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.module - -import akka.stream.impl.FlowModule -import akka.stream.impl.StreamLayout.Module -import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape} - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.Task - -/** - * [[GearpumpTaskModule]] represent modules that can be materialized as Gearpump Tasks. - * - * This is specially designed for Gearpump runtime. It is not supposed to be used - * for local materializer. - * - */ -trait GearpumpTaskModule extends Module - -/** - * This is used to represent the Gearpump Data Source - * @param source - * @param conf - * @param shape - * @param attributes - * @tparam T - */ -final case class SourceTaskModule[T]( - source: DataSource, - conf: UserConfig, - shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")), - attributes: Attributes = Attributes.name("SourceTaskModule")) - extends GearpumpTaskModule { - - override def subModules: Set[Module] = Set.empty - override def withAttributes(attr: Attributes): Module = { - this.copy(shape = amendShape(attr), attributes = attr) - } - override def carbonCopy: Module = { - this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out"))) - } - - override def replaceShape(s: Shape): Module = - if (s == shape) this - else throw new UnsupportedOperationException("cannot replace the shape of SourceTaskModule") - - private def amendShape(attr: Attributes): SourceShape[T] = { - val thisN = attributes.nameOrDefault(null) - val thatN = attr.nameOrDefault(null) - - if ((thatN eq null) || thisN == thatN) shape - else shape.copy(outlet = Outlet(thatN + ".out")) - } -} - -/** - * This is used to represent the Gearpump Data Sink - * @param sink - * @param conf - * @param shape - * @param attributes - * @tparam IN - */ -final case class SinkTaskModule[IN]( - sink: DataSink, - conf: UserConfig, - shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")), - attributes: Attributes = Attributes.name("SinkTaskModule")) - extends GearpumpTaskModule { - - override def subModules: Set[Module] = Set.empty - override def withAttributes(attr: Attributes): Module = { - this.copy(shape = amendShape(attr), attributes = attr) - } - override def carbonCopy: Module = this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in"))) - - override def replaceShape(s: Shape): Module = - if (s == shape) this - else throw new UnsupportedOperationException("cannot replace the shape of SinkTaskModule") - - private def amendShape(attr: Attributes): SinkShape[IN] = { - val thisN = attributes.nameOrDefault(null) - val thatN = attr.nameOrDefault(null) - - if ((thatN eq null) || thisN == thatN) shape - else shape.copy(inlet = Inlet(thatN + ".out")) - } -} - -/** - * This is to represent the Gearpump Processor which has exact one input and one output - * @param processor - * @param conf - * @param attributes - * @tparam IN - * @tparam OUT - * @tparam Unit - */ -case class ProcessorModule[IN, OUT, Unit]( - processor: Class[_ <: Task], - conf: UserConfig, - val attributes: Attributes = Attributes.name("processorModule")) - extends FlowModule[IN, OUT, Unit] with GearpumpTaskModule { - - override def carbonCopy: Module = newInstance - - protected def newInstance: ProcessorModule[IN, OUT, Unit] = { - new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) - } - - override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = { - new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala deleted file mode 100644 index e57a6f6..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.module - -import akka.stream.Attributes -import akka.stream.impl.FlowModule -import akka.stream.impl.StreamLayout.Module - -/** - * - * Group the T value groupBy function - * - * @param f - * @param attributes - * @tparam T - * @tparam Group - */ -case class GroupByModule[T, Group](val groupBy: T => Group, - val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit] { - - override def carbonCopy: Module = newInstance - - protected def newInstance: GroupByModule[T, Group] = { - new GroupByModule[T, Group](groupBy, attributes) - } - - override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = { - new GroupByModule[T, Group](groupBy, attributes) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala deleted file mode 100644 index 926feb6..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.module - -import akka.stream.Attributes -import akka.stream.impl.FlowModule -import akka.stream.impl.StreamLayout.Module - -/** - * - * Reduce Module - * - * @param f - * @param attributes - * @tparam T - */ -case class ReduceModule[T]( - val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule")) - extends FlowModule[T, T, Unit] { - - override def carbonCopy: Module = newInstance - - protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, attributes) - - override def withAttributes(attributes: Attributes): ReduceModule[T] = { - new ReduceModule[T](f, attributes) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala deleted file mode 100644 index 9cc46c9..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.scaladsl - -import akka.stream.Attributes -import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import org.reactivestreams.{Publisher, Subscriber} - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.Task - -object GearSource { - - /** - * Construct a Source which accepts out of band input messages. - * - * [[SourceBridgeModule]] -> Sink - * / - * / - * V - * materialize to [[Subscriber]] - * /| - * / - * upstream [[Publisher]] send out of band message - * - */ - def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = { - val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy"))) - val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]()) - source.viaMat(flow)(Keep.right) - } - - /** - * Construct a Source from Gearpump [[DataSource]]. - * - * [[SourceTaskModule]] -> downstream Sink - * - */ - def from[OUT](source: DataSource): Source[OUT, Unit] = { - val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty)) - taskSource - } - - /** - * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]]. - * - * [[ProcessorModule]] -> downstream Sink - * - */ - def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = { - val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy"))) - val flow = Processor.apply[Unit, OUT](processor, conf) - source.viaMat(flow)(Keep.right) - } -} - -object GearSink { - - /** - * Construct a Sink which output messages to a out of band channel. - * - * Souce -> [[SinkBridgeModule]] - * \ - * \| - * materialize to [[Publisher]] - * \ - * \ - * \| - * send out of band message to downstream [[Subscriber]] - * - */ - def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = { - val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy"))) - val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]()) - flow.to(sink) - } - - /** - * Construct a Sink from Gearpump [[DataSink]]. - * - * Upstream Source -> [[SinkTaskModule]] - * - */ - def to[IN](sink: DataSink): Sink[IN, Unit] = { - val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty)) - taskSink - } - - /** - * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]]. - * - * Upstream Source -> [[ProcessorModule]] - * - */ - def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = { - val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy"))) - val flow = Processor.apply[IN, Unit](processor, conf) - flow.to(sink) - } -} - -/** - * - * GroupBy will divide the main input stream to a set of sub-streams. - * This is a work-around to bypass the limitation of official API Flow.groupBy - * - * - * For example, to do a word count, we can write code like this: - * - * case class KV(key: String, value: String) - * case class Count(key: String, count: Int) - * - * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv => - * Count(kv.key, 1) - * }.fold(Count(null, 0)){(base, add) => - * Count(add.key, base.count + add.count) - * }.log("count of current key") - * .flatten() - * .to(sink) - * - * map, fold will transform data on all sub-streams, If there are 10 groups, - * then there will be 10 sub-streams, and for each sub-stream, there will be - * a map and fold. - * - * flatten will collect all sub-stream into the main stream, - * - * sink will only operate on the main stream. - * - */ -object GroupBy { - def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = { - new Flow[T, T, Unit](new GroupByModule(groupBy)) - } -} - -/** - * Aggregate on the data. - * - * val flow = Reduce({(a: Int, b: Int) => a + b}) - * - * - */ -object Reduce { - def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = { - new Flow[T, T, Unit](new ReduceModule(reduce)) - } -} - -/** - * Create a Flow by providing a Gearpump Processor class and configuration - * - * - */ -object Processor { - def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = { - new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf)) - } -} - -object Implicits { - - /** - * Help util to support reduce and groupBy - */ - implicit class SourceOps[T, Mat](source: Source[T, Mat]) { - - //TODO It is named as groupBy2 to avoid conflict with built-in - // groupBy. Eventually, we think the built-in groupBy should - // be replace with this implementation. - def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = { - val stage = GroupBy.apply(groupBy) - source.via[T, Unit](stage) - } - - def reduce(reduce: (T, T) => T): Source[T, Mat] = { - val stage = Reduce.apply(reduce) - source.via[T, Unit](stage) - } - - def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = { - val stage = Processor.apply[T, R](processor, conf) - source.via(stage) - } - } - - /** - * Help util to support reduce and groupBy - */ - implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) { - def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = { - val stage = GroupBy.apply(groupBy) - flow.via(stage) - } - - def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = { - val stage = Reduce.apply(reduce) - flow.via(stage) - } - - def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = { - val stage = Processor.apply[OUT, R](processor, conf) - flow.via(stage) - } - } - - /** - * Help util to support groupByKey and sum - */ - implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) { - - /** - * if it is a KV Pair, we can group the KV pair by the key. - * @return - */ - def groupByKey: Source[(K, V), Mat] = { - val stage = GroupBy.apply(getTupleKey[K, V]) - source.via(stage) - } - - /** - * Does sum on values - * - * Before doing this, you need to do groupByKey to group same key together - * , otherwise, it will do the sum no matter what current key is. - */ - def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = { - val stage = Reduce.apply(sumByKey[K, V](numeric)) - source.via(stage) - } - } - - /** - * Helper util to support groupByKey and sum - */ - implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) { - - /** - * If it is a KV Pair, we can group the KV pair by the key. - * - */ - def groupByKey: Flow[(K, V), (K, V), Mat] = { - val stage = GroupBy.apply(getTupleKey[K, V]) - flow.via(stage) - } - - /** - * do sum on values - * - * Before doing this, you need to do groupByKey to group same key together - * , otherwise, it will do the sum no matter what current key is. - * - */ - def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = { - val stage = Reduce.apply(sumByKey[K, V](numeric)) - flow.via(stage) - } - } - - private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - - private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = - (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala deleted file mode 100644 index 2eb0612..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.task - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.TaskContext - -class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { - - val sizeOfOutputs = sizeOfOutPorts - var index = 0 - - override def onNext(msg: Message): Unit = { - output(index, msg) - index += 1 - if (index == sizeOfOutputs) { - index = 0 - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala deleted file mode 100644 index 925bf21..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.task - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.TaskContext - -class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { - override def onNext(msg: Message): Unit = { - context.output(msg) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala deleted file mode 100644 index 9a4e24e..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.task - -import akka.stream.gearpump.task.GraphTask.{Index, PortId} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper} - -class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig) - extends Task(inputTaskContext, userConf) { - - private val context = inputTaskContext.asInstanceOf[TaskWrapper] - private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]]( - GraphTask.OUT_PROCESSORS).get) - private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]]( - GraphTask.IN_PROCESSORS).get) - - val sizeOfOutPorts = outMapping.keys.size - val sizeOfInPorts = inMapping.keys.size - - private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = { - val portToProcessor = processors.zipWithIndex.map { kv => - (kv._2, kv._1) - }.toMap - - val processorToIndex = processors.sorted.zipWithIndex.toMap - - val portToIndex = portToProcessor.map { kv => - val (outlet, processorId) = kv - val index = processorToIndex(processorId) - (outlet, index) - } - portToIndex - } - - def output(outletId: Int, msg: Message): Unit = { - context.output(outMapping(outletId), msg) - } - - override def onStart(startTime: StartTime): Unit = {} - - override def onStop(): Unit = {} -} - -object GraphTask { - val OUT_PROCESSORS = "akka.stream.gearpump.task.outprocessors" - val IN_PROCESSORS = "akka.stream.gearpump.task.inprocessors" - - type PortId = Int - type Index = Int -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala deleted file mode 100644 index b681852..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.task - -import java.util -import java.util.concurrent.TimeUnit - -import akka.actor.Actor.Receive -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage -import akka.util.Timeout -import org.reactivestreams.{Publisher, Subscriber, Subscription} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import org.apache.gearpump.util.LogUtil - -/** - * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module - * - * - * upstream [[Task]] -> [[SinkBridgeTask]] - * \ Remote Cluster - * -------------------------\---------------------- - * \ Local JVM - * \| - * Akka Stream [[Subscriber]] - * - */ -class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - import taskContext.taskId - - val queue = new util.LinkedList[Message]() - var subscriber: ActorRef = null - - var request: Int = 0 - - override def onStart(startTime: StartTime): Unit = {} - - override def onNext(msg: Message): Unit = { - queue.add(msg) - trySendingData() - } - - override def onStop(): Unit = {} - - private def trySendingData(): Unit = { - if (subscriber != null) { - (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg => - subscriber ! msg.msg - request -= 1 - } - } - } - - override def receiveUnManagedMessage: Receive = { - case RequestMessage(n) => - this.subscriber = sender - LOG.info("the downstream has requested " + n + " messages from " + subscriber) - request += n.toInt - trySendingData() - case msg => - LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) - } -} - -object SinkBridgeTask { - - case class RequestMessage(number: Int) - - class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, processorId: ProcessorId) extends Publisher[AnyRef] with Subscription { - private val taskId = TaskId(processorId, index = 0) - - private val LOG = LogUtil.getLogger(getClass) - - private var actor: ActorRef = null - import system.dispatcher - - private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container => - // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) - container.task - } - - override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = { - this.actor = system.actorOf(Props(new ClientActor(subscriber))) - subscriber.onSubscribe(this) - } - - override def cancel(): Unit = Unit - - private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - override def request(l: Long): Unit = { - task.foreach { task => - task.tell(RequestMessage(l.toInt), actor) - } - } - } - - class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor { - def receive: Receive = { - case result: AnyRef => subscriber.onNext(result) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala deleted file mode 100644 index ccbd350..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.task - -import scala.concurrent.ExecutionContext - -import akka.actor.Actor.Receive -import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error} -import org.reactivestreams.{Subscriber, Subscription} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} - -/** - * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task - * - * - * - * [[SourceBridgeTask]] --> downstream [[Task]] - * /| Remote Cluster - * ---------------/-------------------------------- - * / Local JVM - * Akka Stream [[org.reactivestreams.Publisher]] - * - */ -class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - import taskContext.taskId - - override def onStart(startTime: StartTime): Unit = {} - - override def onNext(msg: Message): Unit = { - LOG.info("AkkaStreamSource receiving message " + msg) - } - - override def onStop(): Unit = {} - - override def receiveUnManagedMessage: Receive = { - case Error(ex) => - LOG.error("the stream has error", ex) - case AkkaStreamMessage(msg) => - LOG.error("we have received message from akka stream source: " + msg) - taskContext.output(Message(msg, System.currentTimeMillis())) - case Complete(description) => - LOG.error("the stream is completed: " + description) - case msg => - LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString) - } -} - -object SourceBridgeTask { - case class Error(ex: java.lang.Throwable) - - case class Complete(description: String) - - case class AkkaStreamMessage(msg: AnyRef) - - class SourceBridgeTaskClient[T <: AnyRef](ec: ExecutionContext, context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] { - val taskId = TaskId(processorId, 0) - var subscription: Subscription = null - implicit val dispatcher = ec - - val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container => - // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) - container.task - } - - override def onError(throwable: Throwable): Unit = { - task.map(task => task ! Error(throwable)) - } - - override def onSubscribe(subscription: Subscription): Unit = { - this.subscription = subscription - task.map(task => subscription.request(1)) - } - - override def onComplete(): Unit = { - task.map(task => task ! Complete("the upstream is completed")) - } - - override def onNext(t: T): Unit = { - task.map { task => - task ! AkkaStreamMessage(t) - } - subscription.request(1) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala deleted file mode 100644 index 78fabbe..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.task - -import akka.stream.gearpump.task.UnZip2Task.UnZipFunction - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.TaskContext - -class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { - - val unzip = userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip - - override def onNext(msg: Message): Unit = { - val message = msg.msg - val time = msg.timestamp - val pair = unzip(message) - val (a, b) = pair - output(0, Message(a.asInstanceOf[AnyRef], time)) - output(1, Message(b.asInstanceOf[AnyRef], time)) - } -} - -object UnZip2Task { - class UnZipFunction(val unzip: Any => (Any, Any)) extends Serializable - - val UNZIP2_FUNCTION = "akka.stream.gearpump.task.unzip2.function" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala deleted file mode 100644 index c774fc7..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.util - -import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform} - -class MaterializedValueOps(mat: MaterializedValueNode) { - def resolve[Mat](materializedValues: Map[Module, Any]): Mat = { - def resolveMaterialized(mat: MaterializedValueNode, materializedValues: Map[Module, Any]): Any = mat match { - case Atomic(m) => materializedValues.getOrElse(m, ()) - case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) - case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) - case Ignore => () - } - resolveMaterialized(mat, materializedValues).asInstanceOf[Mat] - } -} - -object MaterializedValueOps { - def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat) -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala new file mode 100644 index 0000000..4384b39 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import akka.stream.Attributes +import akka.stream.Attributes.Attribute + +object GearAttributes { + /** + * Define how many parallel instance we want to use to run this module + * @param count Int + * @return + */ + def count(count: Int): Attributes = Attributes(ParallismAttribute(count)) + + /** + * Define we want to render this module locally. + * @return + */ + def local: Attributes = Attributes(LocationAttribute(Local)) + + /** + * Define we want to render this module remotely + * @return + */ + def remote: Attributes = Attributes(LocationAttribute(Remote)) + + /** + * Get the effective location settings if child override the parent + * setttings. + * + * @param attrs Attributes + * @return + */ + def location(attrs: Attributes): Location = { + attrs.attributeList.foldLeft(Local: Location) { (s, attr) => + attr match { + case LocationAttribute(location) => location + case other => s + } + } + } + + /** + * get effective parallelism settings if child override parent. + * @param attrs Attributes + * @return + */ + def count(attrs: Attributes): Int = { + attrs.attributeList.foldLeft(1) { (s, attr) => + attr match { + case ParallismAttribute(count) => count + case other => s + } + } + } + + /** + * Where we want to render the module + */ + sealed trait Location + object Local extends Location + object Remote extends Location + + final case class LocationAttribute(tag: Location) extends Attribute + + /** + * How many parallel instance we want to use for this module. + * + * @param parallelism Int + */ + final case class ParallismAttribute(parallelism: Int) extends Attribute +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala new file mode 100644 index 0000000..07c95f8 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.NotUsed +import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem} +import akka.event.{Logging, LoggingAdapter} +import akka.stream.Attributes.Attribute +import akka.stream.impl.Stages.SymbolicGraphStage +import akka.stream.impl.StreamLayout.{Atomic, Combine, CopiedModule, Ignore, MaterializedValueNode, Module, Transform} +import akka.stream.{ActorAttributes, ActorMaterializerSettings, Attributes, ClosedShape, Fusing, Graph, InPort, OutPort, SinkShape} +import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule} +import akka.stream.impl.{ExtendedActorMaterializer, StreamSupervisor} +import akka.stream.stage.GraphStage +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy +import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer +import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer +import org.apache.gearpump.akkastream.graph._ +import org.apache.gearpump.util.{Graph => GGraph} + +import scala.collection.mutable +import scala.concurrent.{ExecutionContextExecutor, Promise} +import scala.concurrent.duration.FiniteDuration + +object GearpumpMaterializer { + + final case class Edge(from: OutPort, to: InPort) + + final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute + + implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool) + + def apply(strategy: Strategy)(implicit context: ActorRefFactory): ExtendedActorMaterializer = { + val system = actorSystemOf(context) + + apply(ActorMaterializerSettings( + system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context) + } + + def apply(materializerSettings: Option[ActorMaterializerSettings] = None, + strategy: Strategy = GraphPartitioner.AllRemoteStrategy, + useLocalCluster: Boolean = true, + namePrefix: Option[String] = None)(implicit context: ActorRefFactory): + ExtendedActorMaterializer = { + val system = actorSystemOf(context) + + val settings = materializerSettings getOrElse + ActorMaterializerSettings(system).withAutoFusing(false) + apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context) + } + + def apply(materializerSettings: ActorMaterializerSettings, + strategy: Strategy, + useLocalCluster: Boolean, + namePrefix: String)(implicit context: ActorRefFactory): + ExtendedActorMaterializer = { + val system = actorSystemOf(context) + + new GearpumpMaterializer( + system, + materializerSettings, + context.actorOf( + StreamSupervisor.props(materializerSettings, false).withDispatcher( + materializerSettings.dispatcher), StreamSupervisor.nextName())) + } + + + private def actorSystemOf(context: ActorRefFactory): ActorSystem = { + val system = context match { + case s: ExtendedActorSystem => s + case c: ActorContext => c.system + case null => throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ => + throw new IllegalArgumentException( + s""" + | context must be a ActorSystem or ActorContext, got [${context.getClass.getName}] + """.stripMargin + ) + } + system + } + +} + +/** + * + * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump + * streaming application. If some module cannot be rendered remotely in Gearpump + * Cluster, then it will use local Actor materializer as fallback to materialize + * the module locally. + * + * User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]] + * to determine which module should be rendered + * remotely, and which module should be rendered locally. + * + * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]] + * to find out how we cut the runnableGraph to two parts, + * and materialize them separately. + * @param system ActorSystem + * @param strategy Strategy + * @param useLocalCluster whether to use built-in in-process local cluster + */ +class GearpumpMaterializer(override val system: ActorSystem, + override val settings: ActorMaterializerSettings, + override val supervisor: ActorRef, + strategy: Strategy = GraphPartitioner.AllRemoteStrategy, + useLocalCluster: Boolean = true, namePrefix: Option[String] = None) + extends ExtendedActorMaterializer { + + private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map( + classOf[LocalGraph] -> new LocalGraphMaterializer(system), + classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system) + ) + + override def logger: LoggingAdapter = Logging.getLogger(system, this) + + override def isShutdown: Boolean = system.whenTerminated.isCompleted + + override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { + import ActorAttributes._ + import Attributes._ + opAttr.attributeList.foldLeft(settings) { (s, attr) => + attr match { + case InputBuffer(initial, max) => s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) + case _ => s + } + } + } + + override def withNamePrefix(name: String): ExtendedActorMaterializer = + throw new UnsupportedOperationException() + + override implicit def executionContext: ExecutionContextExecutor = + throw new UnsupportedOperationException() + + override def schedulePeriodically(initialDelay: FiniteDuration, + interval: FiniteDuration, + task: Runnable): Cancellable = + system.scheduler.schedule(initialDelay, interval, task)(executionContext) + + override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = + system.scheduler.scheduleOnce(delay, task)(executionContext) + + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { + val info = Fusing.aggressive(runnableGraph).module.info + val graph = GGraph.empty[Module, Edge] + + info.subModules.foreach(module => { + if (module.isCopied) { + val original = module.asInstanceOf[CopiedModule].copyOf + graph.addVertex(original) + module.shape.outlets.zip(original.shape.outlets).foreach(out => { + val (cout, oout) = out + val cin = info.downstreams(cout) + val downStreamModule = info.inOwners(cin) + if(downStreamModule.isCopied) { + val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf + downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => { + in._1 == cin match { + case true => + val oin = in._2 + graph.addEdge(original, Edge(oout, oin), downStreamOriginal) + case false => + } + }) + } + }) + } + }) + + printGraph(graph) + + val subGraphs = GraphPartitioner(strategy).partition(graph) + val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) => + val materializer = subMaterializers(subGraph.getClass) + map ++ materializer.materialize(subGraph, map) + } + val mat = matValues.flatMap(pair => { + val (module, any) = pair + any match { + case notUsed: NotUsed => + None + case others => + val rt = module.shape match { + case sink: SinkShape[_] => + Some(any) + case _ => + None + } + rt + } + }).toList + val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last + resolveMaterialized(matModule.materializedValueComputation, matValues) + val rt = Some(mat).flatMap(any => { + any match { + case promise: Promise[_] => + Some(promise.future) + case other => + Some(other) + } + }) + rt.orNull.asInstanceOf[Mat] + } + + private def printGraph(graph: GGraph[Module, Edge]): Unit = { + val iterator = graph.topologicalOrderIterator + while (iterator.hasNext) { + val module = iterator.next() + // scalastyle:off println + module match { + case graphStageModule: GraphStageModule => + graphStageModule.stage match { + case symbolicGraphStage: SymbolicGraphStage[_, _, _] => + val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName + println( + s"${module.getClass.getSimpleName}(${symbolicName})" + ) + case graphStage: GraphStage[_] => + val name = graphStage.getClass.getSimpleName + println( + s"${module.getClass.getSimpleName}(${name})" + ) + case other => + println( + s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})" + ) + } + case _ => + println(module.getClass.getSimpleName) + } + // scalastyle:on println + } + } + + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat], + initialAttributes: Attributes): Mat = { + materialize(runnableGraph) + } + + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: (GraphInterpreterShell) => ActorRef): Mat = { + materialize(runnableGraph) + } + + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = { + materialize(runnableGraph) + } + + override def makeLogger(logSource: Class[_]): LoggingAdapter = { + logger + } + + def shutdown: Unit = { + subMaterializers.values.foreach(_.shutdown) + } + + private def resolveMaterialized(mat: MaterializedValueNode, + materializedValues: mutable.Map[Module, Any]): Any = mat match { + case Atomic(m) => + materializedValues.getOrElse(m, ()) + case Combine(f, d1, d2) => + f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) + case Transform(f, d) => + f(resolveMaterialized(d, materializedValues)) + case Ignore => + () + } + + + +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala new file mode 100644 index 0000000..8a869d2 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import java.{util => ju} + +import _root_.org.apache.gearpump.util.{Graph => GGraph} +import akka.actor.ActorSystem +import akka.stream._ +import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, MaterializedValueSourceAttribute} +import akka.stream.impl.StreamLayout._ +import akka.stream.impl._ +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource + +class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module, + initialAttributes: Attributes, namePrefix: Option[String] = None) + extends MaterializerSession(topLevel, initialAttributes) { + + private[this] def createFlowName(): String = + FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next() + + private val flowName = createFlowName() + private var nextId = 0 + + private def stageName(attr: Attributes): String = { + val name = s"$flowName-$nextId-${attr.nameOrDefault()}" + nextId += 1 + name + } + + val graph = GGraph.empty[Module, Edge] + + def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = { + graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2) + } + + def addVertex(module: Module): Unit = { + graph.addVertex(module) + } + + override def materializeModule(module: Module, parentAttributes: Attributes): Any = { + + val materializedValues: ju.Map[Module, Any] = new ju.HashMap + val currentAttributes = mergeAttributes(parentAttributes, module.attributes) + + val materializedValueSources = List.empty[MaterializedValueSource[_]] + + for (submodule <- module.subModules) { + submodule match { + case atomic: AtomicModule => + materializeAtomic(atomic, currentAttributes, materializedValues) + case copied: CopiedModule => + enterScope(copied) + materializedValues.put(copied, materializeModule(copied, currentAttributes)) + exitScope(copied) + case composite => + materializedValues.put(composite, materializeComposite(composite, currentAttributes)) + case EmptyModule => + } + } + + val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) + + materializedValueSources.foreach { module => + val matAttribute = + new MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode]) + val copied = copyAtomicModule(module.module, parentAttributes + and Attributes(matAttribute)) + // TODO + // assignPort(module.shape.out, (copied.shape.outlets.head, copied)) + addVertex(copied) + materializedValues.put(copied, Atomic(copied)) + } + mat + + } + + override protected def materializeComposite(composite: Module, + effectiveAttributes: Attributes): Any = { + materializeModule(composite, effectiveAttributes) + } + + protected def materializeAtomic(atomic: AtomicModule, + parentAttributes: Attributes, + matVal: ju.Map[Module, Any]): Unit = { + + val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets) + val copied = copyAtomicModule(atomic, parentAttributes) + + for ((in, id) <- inputs.zipWithIndex) { + val inPort = inPortMapping(atomic, copied)(in) + // assignPort(in, (inPort, copied)) + } + + for ((out, id) <- outputs.zipWithIndex) { + val outPort = outPortMapping(atomic, copied)(out) + // TODO + // assignPort(out, (outPort, copied)) + } + + addVertex(copied) + matVal.put(atomic, Atomic(copied)) + } + + private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = { + val currentAttributes = mergeAttributes(parentAttributes, module.attributes) + module.withAttributes(currentAttributes).asInstanceOf[T] + } + + private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = { + from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap + } + + private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = { + from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap + } + + protected def resolveMaterialized(matNode: MaterializedValueNode, + materializedValues: ju.Map[Module, Any]): + Any = + matNode match { + case Atomic(m) => materializedValues.get(m) + case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), + resolveMaterialized(d2, materializedValues)) + case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) + case Ignore => Ignore + } +} + +object GearpumpMaterializerSession { + def apply(system: ActorSystem, topLevel: Module, + initialAttributes: Attributes, namePrefix: Option[String] = None): + GearpumpMaterializerSession = { + new GearpumpMaterializerSession(system, topLevel, initialAttributes, namePrefix) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala new file mode 100644 index 0000000..52a45d9 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.scaladsl.{Sink, Source} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.graph.GraphPartitioner +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +/** + * Source and Sink are materialized locally. + * Remaining GraphStages are materialized remotely: + * statefulMap, filter, fold, flatMap + */ +object Test extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test", akkaConf) + implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy) + + val echo = system.actorOf(Props(new Echo())) + val sink = Sink.actorRef(echo, "COMPLETE") + + Source( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky") + ).filter(_.startsWith("red")).fold("Items:") {(a, b) => + a + "|" + b + }.map("I want to order item: " + _).runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +}
