http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala new file mode 100644 index 0000000..8fbe785 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.actor.ActorSystem +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl +import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule} +import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient +import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient +import akka.stream.impl.StreamLayout.Module +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedCluster +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.util.Graph + +/** + * + * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only + * contain modules that can be materialized in remote Gearpump cluster. + * + * @param graph Graph + */ +class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph + +object RemoteGraph { + + /** + * * materialize LocalGraph in remote gearpump cluster + * @param useInProcessCluster Boolean + * @param system ActorSystem + */ + class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) + extends SubGraphMaterializer { + private val local = if (useInProcessCluster) { + val cluster = EmbeddedCluster() + cluster.start() + Some(cluster) + } else { + None + } + + private val context: ClientContext = local match { + case Some(l) => l.newClientContext + case None => ClientContext(system) + } + + override def materialize(subGraph: SubGraph, + inputMatValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val graph = subGraph.graph + + if (graph.isEmpty) { + inputMatValues + } else { + doMaterialize(graph: Graph[Module, Edge], inputMatValues) + } + } + + private def doMaterialize(graph: Graph[Module, Edge], + inputMatValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val materializer = new RemoteMaterializerImpl(graph, system) + val (app, matValues) = materializer.materialize + + val appId = context.submit(app) + // scalastyle:off println + println("sleep 5 second until the application is ready on cluster") + // scalastyle:on println + Thread.sleep(5000) + + def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = { + matValues.toList.flatMap { kv => + val (module, processorId) = kv + module match { + case source: SourceBridgeModule[_, _] => + val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher, + context, appId, processorId) + Some((module, bridge)) + case sink: SinkBridgeModule[_, _] => + val bridge = new SinkBridgeTaskClient(system, context, appId, processorId) + Some((module, bridge)) + case other => + None + } + }.toMap + } + + inputMatValues ++ resolve(matValues) + } + + override def shutdown: Unit = { + context.close() + local.foreach(_.stop()) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala new file mode 100644 index 0000000..a0395de --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.actor.ActorSystem +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import akka.stream.impl.StreamLayout.Module +import org.apache.gearpump.util.Graph + +/** + * [[SubGraph]] is a partial DAG + * + * The idea is that by dividing [[Graph]] to several + * [[SubGraph]], we can materialize each [[SubGraph]] with different + * materializer. + */ + +trait SubGraph { + + /** + * the [[Graph]] representation of this SubGraph + * @return + */ + def graph: Graph[Module, Edge] +} + + +/** + * Materializer for Sub-Graph type + */ +trait SubGraphMaterializer { + /** + * + * @param matValues Materialized Values for each module before materialization + * @return Materialized Values for each Module after the materialization. + */ + + def materialize(graph: SubGraph, + matValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] + + def shutdown: Unit +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala new file mode 100644 index 0000000..cbafcf5 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.materializer + +import java.util.concurrent.atomic.AtomicBoolean +import java.{util => ju} + +import _root_.org.apache.gearpump.util.{Graph => GGraph} +import akka.NotUsed +import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill} +import akka.dispatch.Dispatchers +import akka.event.{Logging, LoggingAdapter} +import akka.stream.impl.StreamLayout._ +import akka.stream.impl._ +import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource +import akka.stream.impl.fusing.{Map => _, _} +import akka.stream.impl.io.{TLSActor, TlsModule} +import akka.stream.scaladsl.{GraphDSL, Keep, ModuleExtractor, RunnableGraph} +import akka.stream.{ClosedShape, Graph => AkkaGraph, _} +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.module.ReduceModule +import org.apache.gearpump.akkastream.util.MaterializedValueOps +import org.reactivestreams.{Publisher, Subscriber} + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration.FiniteDuration + +/** + * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] + * + * @param system System + * @param settings ActorMaterializerSettings + * @param dispatchers Dispatchers + * @param supervisor ActorRef + * @param haveShutDown AtomicBoolean + * @param flowNames SeqActorName + */ +case class LocalMaterializerImpl ( + override val system: ActorSystem, + override val settings: ActorMaterializerSettings, + dispatchers: Dispatchers, + override val supervisor: ActorRef, + haveShutDown: AtomicBoolean, + flowNames: SeqActorName) + extends ExtendedActorMaterializer { + + override def logger: LoggingAdapter = Logging.getLogger(system, this) + + override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, + task: Runnable): Cancellable = + system.scheduler.schedule(initialDelay, interval, task)(executionContext) + + override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = + system.scheduler.scheduleOnce(delay, task)(executionContext) + + override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { + import ActorAttributes._ + import Attributes._ + opAttr.attributeList.foldLeft(settings) { (s, attr) => + attr match { + case InputBuffer(initial, max) => s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) + case l: LogLevels => s + case Name(_) => s + case other => s + } + } + } + + override def shutdown(): Unit = + if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill + + override def isShutdown: Boolean = haveShutDown.get() + + override lazy val executionContext: ExecutionContextExecutor = + dispatchers.lookup(settings.dispatcher match { + case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId + case other => other + }) + + + case class LocalMaterializerSession(module: Module, iAttributes: Attributes, + subflowFuser: GraphInterpreterShell => ActorRef = null) + extends MaterializerSession(module, iAttributes) { + + override def materializeAtomic(atomic: AtomicModule, + effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { + + def newMaterializationContext() = + new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, + stageName(effectiveAttributes)) + atomic match { + case sink: SinkModule[_, _] => + val (sub, mat) = sink.create(newMaterializationContext()) + assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]]) + matVal.put(atomic, mat) + case source: SourceModule[_, _] => + val (pub, mat) = source.create(newMaterializationContext()) + assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]]) + matVal.put(atomic, mat) + case stage: ProcessorModule[_, _, _] => + val (processor, mat) = stage.createProcessor() + assignPort(stage.inPort, processor) + assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]]) + matVal.put(atomic, mat) + case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here + val es = effectiveSettings(effectiveAttributes) + val props = + TLSActor.props(es, tls.sslContext, tls.sslConfig, + tls.firstSession, tls.role, tls.closing, tls.hostInfo) + val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) + def factory(id: Int) = new ActorPublisher[Any](impl) { + override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) + } + val publishers = Vector.tabulate(2)(factory) + impl ! FanOut.ExposedPublishers(publishers) + + assignPort(tls.plainOut, publishers(TLSActor.UserOut)) + assignPort(tls.cipherOut, publishers(TLSActor.TransportOut)) + + assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn)) + assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn)) + + matVal.put(atomic, NotUsed) + case graph: GraphModule => + matGraph(graph, effectiveAttributes, matVal) + case stage: GraphStageModule => + val graph = + GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage), + stage.shape, stage.attributes, Array(stage)) + matGraph(graph, effectiveAttributes, matVal) + } + } + + private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, + matVal: ju.Map[Module, Any]): Unit = { + val calculatedSettings = effectiveSettings(effectiveAttributes) + val (handlers, logics) = + graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) + + val shell = new GraphInterpreterShell(graph.assembly, handlers, + logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this) + + val impl = + if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) { + subflowFuser(shell) + } else { + val props = ActorGraphInterpreter.props(shell) + actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) + } + + for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) { + val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i) + assignPort(inlet, subscriber) + } + for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) { + val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i) + impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher) + assignPort(outlet, publisher) + } + } + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = { + + LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get, + null, null).materialize().asInstanceOf[Mat] + + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell => ActorRef): Mat = { + + LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get, + null, null).materialize().asInstanceOf[Mat] + + } + + def buildToplevelModule(graph: GGraph[Module, Edge]): Module = { + var moduleInProgress: Module = EmptyModule + graph.vertices.foreach(module => { + moduleInProgress = moduleInProgress.compose(module) + }) + graph.edges.foreach(value => { + val (node1, edge, node2) = value + moduleInProgress = moduleInProgress.wire(edge.from, edge.to) + }) + + moduleInProgress + } + + def materialize(graph: GGraph[Module, Edge], + inputMatValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val topLevelModule = buildToplevelModule(graph) + val session = LocalMaterializerSession(topLevelModule, null, null) + import scala.collection.JavaConverters._ + val matV = inputMatValues.asJava + val materializedGraph = graph.mapVertex { module => + session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV) + matV.get(module) + } + materializedGraph.edges.foreach { nodeEdgeNode => + val (node1, edge, node2) = nodeEdgeNode + val from = edge.from + val to = edge.to + node1 match { + case module1: Module => + node2 match { + case module2: Module => + val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]] + val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]] + publisher.subscribe(subscriber) + case _ => + } + case _ => + } + } + val matValSources = graph.vertices.flatMap(module => { + val rt: Option[MaterializedValueSource[_]] = module match { + case graphStage: GraphStageModule => + graphStage.stage match { + case materializedValueSource: MaterializedValueSource[_] => + Some(materializedValueSource) + case _ => + None + } + case _ => + None + } + rt + }) + publishToMaterializedValueSource(matValSources, inputMatValues) + inputMatValues + } + + private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]], + matValues: scala.collection.mutable.Map[Module, Any]): Unit = { + modules.foreach { source => + Option(source.computation).map { attr => + val valueToPublish = MaterializedValueOps(attr).resolve(matValues) + source.setValue(valueToPublish) + } + } + } + + private[this] def createFlowName(): String = flowNames.next() + + val flowName = createFlowName() + var nextId = 0 + + def stageName(attr: Attributes): String = { + val name = s"$flowName-$nextId-${attr.nameOrDefault()}" + nextId += 1 + name + } + + override def withNamePrefix(name: String): LocalMaterializerImpl = + this.copy(flowNames = flowNames.copy(name)) + +} + +object LocalMaterializerImpl { + case class MaterializedModule(module: Module, matValue: Any, + inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], + outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]]) + + def apply(materializerSettings: Option[ActorMaterializerSettings] = None, + namePrefix: Option[String] = None)(implicit system: ActorSystem): + LocalMaterializerImpl = { + + val settings = materializerSettings getOrElse ActorMaterializerSettings(system) + apply(settings, namePrefix.getOrElse("flow"))(system) + } + + def apply(materializerSettings: ActorMaterializerSettings, + namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl = { + val haveShutDown = new AtomicBoolean(false) + + new LocalMaterializerImpl( + system, + materializerSettings, + system.dispatchers, + system.actorOf(StreamSupervisor.props(materializerSettings, + haveShutDown).withDispatcher(materializerSettings.dispatcher)), + haveShutDown, + FlowNames(system).name.copy(namePrefix)) + } + + def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = { + val f = reduce.f + val aggregator = {(zero: Any, input: Any) => + if (zero == null) { + input + } else { + f(zero, input) + } + } + Fold(null, aggregator) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala new file mode 100644 index 0000000..f3f8094 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.materializer + +import akka.actor.ActorSystem +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.Timers._ +import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource, TickSource} +import akka.stream.impl.fusing.{Map => FMap, _} +import akka.stream.impl.io.IncomingConnectionStage +import akka.stream.impl.{HeadOptionStage, Stages, Throttle} +import akka.stream.scaladsl._ +import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue +import akka.stream.stage.GraphStage +import akka.stream.{FanInShape, FanOutShape} +import org.apache.gearpump.akkastream.GearAttributes +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.module._ +import org.apache.gearpump.akkastream.task._ +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} +import org.apache.gearpump.util.Graph +import org.slf4j.LoggerFactory + +import scala.concurrent.Promise +import scala.concurrent.duration.FiniteDuration + +/** + * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump + * Streaming Application. + * + * @param graph Graph + * @param system ActorSystem + */ +class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { + + import RemoteMaterializerImpl._ + + type ID = String + private implicit val actorSystem = system + + private def uuid: String = { + java.util.UUID.randomUUID.toString + } + + def materialize: (StreamApplication, Map[Module, ProcessorId]) = { + val (opGraph, ids) = toOpGraph + val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph) + val processorIds = resolveIds(app, ids) + + val updatedApp = updateJunctionConfig(processorIds, app) + (removeIds(updatedApp), processorIds) + } + + private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], + app: StreamApplication): StreamApplication = { + val config = junctionConfig(processorIds) + + val dag = app.dag.mapVertex { vertex => + val processorId = vertex.id + val newConf = vertex.taskConf.withConfig(config(processorId)) + vertex.copy(taskConf = newConf) + } + new StreamApplication(app.name, app.inputUserConfig, dag) + } + + private def junctionConfig(processorIds: Map[Module, ProcessorId]): + Map[ProcessorId, UserConfig] = { + val updatedConfigs = graph.vertices.flatMap { vertex => + buildShape(vertex, processorIds) + }.toMap + updatedConfigs + } + + private def buildShape(vertex: Module, processorIds: Map[Module, ProcessorId]): + Option[(ProcessorId, UserConfig)] = { + def inProcessors(vertex: Module): List[ProcessorId] = { + vertex.shape.inlets.flatMap { inlet => + graph.incomingEdgesOf(vertex).find( + _._2.to == inlet).map(_._1 + ).flatMap(processorIds.get(_)) + }.toList + } + def outProcessors(vertex: Module): List[ProcessorId] = { + vertex.shape.outlets.flatMap { outlet => + graph.outgoingEdgesOf(vertex).find( + _._2.from == outlet).map(_._3 + ).flatMap(processorIds.get(_)) + }.toList + } + processorIds.get(vertex).map(processorId => { + (processorId, UserConfig.empty. + withValue(GraphTask.OUT_PROCESSORS, outProcessors(vertex)). + withValue(GraphTask.IN_PROCESSORS, inProcessors(vertex))) + }) + } + + private def resolveIds(app: StreamApplication, ids: Map[Module, ID]): + Map[Module, ProcessorId] = { + ids.flatMap { kv => + val (module, id) = kv + val processorId = app.dag.vertices.find { processor => + processor.taskConf.getString(id).isDefined + }.map(_.id) + processorId.map((module, _)) + } + } + + private def removeIds(app: StreamApplication): StreamApplication = { + val graph = app.dag.mapVertex { processor => + val conf = removeId(processor.taskConf) + processor.copy(taskConf = conf) + } + new StreamApplication(app.name, app.inputUserConfig, graph) + } + + private def removeId(conf: UserConfig): UserConfig = { + conf.filter { kv => + kv._2 != RemoteMaterializerImpl.TRACKABLE + } + } + + private def toOpGraph: (Graph[Op, OpEdge], Map[Module, ID]) = { + var matValues = collection.mutable.Map.empty[Module, ID] + val opGraph = graph.mapVertex[Op] { module => + val name = uuid + val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.TRACKABLE) + matValues += module -> name + val parallelism = GearAttributes.count(module.attributes) + val op = module match { + case source: SourceTaskModule[_] => + val updatedConf = conf.withConfig(source.conf) + DataSourceOp(source.source, parallelism, updatedConf, "source") + case sink: SinkTaskModule[_] => + val updatedConf = conf.withConfig(sink.conf) + DataSinkOp(sink.sink, parallelism, updatedConf, "sink") + case sourceBridge: SourceBridgeModule[_, _] => + ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source") + case processor: ProcessorModule[_, _, _] => + val updatedConf = conf.withConfig(processor.conf) + ProcessorOp(processor.processor, parallelism, updatedConf, "source") + case sinkBridge: SinkBridgeModule[_, _] => + ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") + case groupBy: GroupByModule[_, _] => + GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf) + case reduce: ReduceModule[_] => + reduceOp(reduce.f, conf) + case graphStage: GraphStageModule => + translateGraphStageWithMaterializedValue(graphStage, parallelism, conf) + } + if (op == null) { + throw new UnsupportedOperationException( + module.getClass.toString + " is not supported with RemoteMaterializer" + ) + } + op + }.mapEdge[OpEdge] { (n1, edge, n2) => + n2 match { + case master: MasterOp => + Shuffle + case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] => + Shuffle + case slave: SlaveOp[_] => + Direct + } + } + (opGraph, matValues.toMap) + } + + private def translateGraphStageWithMaterializedValue(module: GraphStageModule, + parallelism: Int, conf: UserConfig): Op = { + module.stage match { + case tickSource: TickSource[_] => + import TickSourceTask._ + val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef] + val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, tickSource.initialDelay). + withValue[FiniteDuration](INTERVAL, tickSource.interval). + withValue(TICK, tick) + ProcessorOp(classOf[TickSourceTask[_]], parallelism, tiConf, "tickSource") + case graphStage: GraphStage[_] => + translateGraphStage(module, parallelism, conf) + case headOptionStage: HeadOptionStage[_] => + headOptionOp(headOptionStage, conf) + case pushPullGraphStageWithMaterializedValue: + PushPullGraphStageWithMaterializedValue[_, _, _, _] => + translateSymbolic(pushPullGraphStageWithMaterializedValue, conf) + } + } + + private def translateGraphStage(module: GraphStageModule, + parallelism: Int, conf: UserConfig): Op = { + module.stage match { + case balance: Balance[_] => + ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance") + case batch: Batch[_, _] => + val batchConf = conf.withValue[_ => Long](BatchTask.COST, batch.costFn). + withLong(BatchTask.MAX, batch.max). + withValue[(_, _) => _](BatchTask.AGGREGATE, batch.aggregate). + withValue[_ => _](BatchTask.SEED, batch.seed) + ProcessorOp(classOf[BatchTask[_, _]], + parallelism, batchConf, "batch") + case broadcast: Broadcast[_] => + val name = ModuleExtractor.unapply(broadcast).map(_.attributes.nameOrDefault()).get + ProcessorOp(classOf[BroadcastTask], parallelism, conf, name) + case collect: Collect[_, _] => + collectOp(collect.pf, conf) + case concat: Concat[_] => + ProcessorOp(classOf[ConcatTask], parallelism, conf, "concat") + case delayInitial: DelayInitial[_] => + val dIConf = conf.withValue[FiniteDuration]( + DelayInitialTask.DELAY_INITIAL, delayInitial.delay) + ProcessorOp(classOf[DelayInitialTask[_]], parallelism, dIConf, "delayInitial") + case dropWhile: DropWhile[_] => + dropWhileOp(dropWhile.p, conf) + case flattenMerge: FlattenMerge[_, _] => + ProcessorOp(classOf[FlattenMergeTask], parallelism, conf, "flattenMerge") + case fold: Fold[_, _] => + val foldConf = conf.withValue(FoldTask.ZERO, fold.zero.asInstanceOf[AnyRef]). + withValue(FoldTask.AGGREGATOR, fold.f) + ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold") + case groupBy: GroupBy[_, _] => + GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf) + case groupedWithin: GroupedWithin[_] => + val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d). + withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n) + ProcessorOp(classOf[GroupedWithinTask[_]], parallelism, diConf, "groupedWithin") + case idleInject: IdleInject[_, _] => + // TODO + null + case idleTimeoutBidi: IdleTimeoutBidi[_, _] => + // TODO + null + case incomingConnectionStage: IncomingConnectionStage => + // TODO + null + case interleave: Interleave[_] => + val ilConf = conf.withInt(InterleaveTask.INPUT_PORTS, interleave.inputPorts). + withInt(InterleaveTask.SEGMENT_SIZE, interleave.segmentSize) + ProcessorOp(classOf[InterleaveTask], parallelism, ilConf, "interleave") + null + case intersperse: Intersperse[_] => + // TODO + null + case limitWeighted: LimitWeighted[_] => + // TODO + null + case map: FMap[_, _] => + mapOp(map.f, conf) + case mapAsync: MapAsync[_, _] => + ProcessorOp(classOf[MapAsyncTask[_, _]], + mapAsync.parallelism, conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsync.f), "mapAsync") + case mapAsyncUnordered: MapAsyncUnordered[_, _] => + ProcessorOp(classOf[MapAsyncTask[_, _]], + mapAsyncUnordered.parallelism, + conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsyncUnordered.f), "mapAsyncUnordered") + case materializedValueSource: MaterializedValueSource[_] => + // TODO + null + case merge: Merge[_] => + val mergeConf = conf.withBoolean(MergeTask.EAGER_COMPLETE, merge.eagerComplete). + withInt(MergeTask.INPUT_PORTS, merge.inputPorts) + ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge") + case mergePreferred: MergePreferred[_] => + MergeOp("mergePreferred", conf) + case mergeSorted: MergeSorted[_] => + MergeOp("mergeSorted", conf) + case prefixAndTail: PrefixAndTail[_] => + // TODO + null + case recover: Recover[_] => + // TODO + null + case scan: Scan[_, _] => + scanOp(scan.zero, scan.f, conf) + case simpleLinearGraphStage: SimpleLinearGraphStage[_] => + translateSimpleLinearGraph(simpleLinearGraphStage, parallelism, conf) + case singleSource: SingleSource[_] => + val singleSourceConf = conf.withValue[AnyRef](SingleSourceTask.ELEMENT, + singleSource.elem.asInstanceOf[AnyRef]) + ProcessorOp(classOf[SingleSourceTask[_]], parallelism, singleSourceConf, "singleSource") + case split: Split[_] => + // TODO + null + case statefulMapConcat: StatefulMapConcat[_, _] => + val func = statefulMapConcat.f + val statefulMapConf = + conf.withValue[() => _ => Iterable[_]](StatefulMapConcatTask.FUNC, func) + ProcessorOp(classOf[StatefulMapConcatTask[_, _]], parallelism, + statefulMapConf, "statefulMapConcat") + case subSink: SubSink[_] => + // TODO + null + case subSource: SubSource[_] => + // TODO + null + case unfold: Unfold[_, _] => + // TODO + null + case unfoldAsync: UnfoldAsync[_, _] => + // TODO + null + case unzip: Unzip[_, _] => + ProcessorOp(classOf[Unzip2Task[_, _, _]], + parallelism, + conf.withValue( + Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper) + ), "unzip") + case zip: Zip[_, _] => + zipWithOp(zip.zipper, conf) + case zipWith2: ZipWith2[_, _, _] => + ProcessorOp(classOf[Zip2Task[_, _, _]], + parallelism, + conf.withValue( + Zip2Task.ZIP2_FUNCTION, Zip2Task.ZipFunction(zipWith2.zipper) + ), "zipWith2") + } + } + + private def translateSimpleLinearGraph(stage: SimpleLinearGraphStage[_], + parallelism: Int, conf: UserConfig): Op = { + stage match { + case completion: Completion[_] => + // TODO + null + case delay: Delay[_] => + // TODO + null + case drop: Drop[_] => + dropOp(drop.count, conf) + case dropWithin: DropWithin[_] => + val dropWithinConf = + conf.withValue[FiniteDuration](DropWithinTask.TIMEOUT, dropWithin.timeout) + ProcessorOp(classOf[DropWithinTask[_]], + parallelism, dropWithinConf, "dropWithin") + case filter: Filter[_] => + filterOp(filter.p, conf) + case idle: Idle[_] => + // TODO + null + case initial: Initial[_] => + // TODO + null + case log: Log[_] => + logOp(log.name, log.extract, conf) + case reduce: Reduce[_] => + reduceOp(reduce.f, conf) + case take: Take[_] => + takeOp(take.count, conf) + case takeWhile: TakeWhile[_] => + filterOp(takeWhile.p, conf) + case takeWithin: TakeWithin[_] => + val takeWithinConf = + conf.withValue[FiniteDuration](TakeWithinTask.TIMEOUT, takeWithin.timeout) + ProcessorOp(classOf[TakeWithinTask[_]], + parallelism, takeWithinConf, "takeWithin") + case throttle: Throttle[_] => + val throttleConf = conf.withInt(ThrottleTask.COST, throttle.cost). + withInt(ThrottleTask.MAX_BURST, throttle.maximumBurst). + withValue[_ => Int](ThrottleTask.COST_CALC, throttle.costCalculation). + withValue[FiniteDuration](ThrottleTask.TIME_PERIOD, throttle.per) + ProcessorOp(classOf[ThrottleTask[_]], + parallelism, throttleConf, "throttle") + } + } + + private def translateSymbolic(stage: PushPullGraphStageWithMaterializedValue[_, _, _, _], + conf: UserConfig): Op = { + stage match { + case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _] => + symbolicGraphStage.symbolicStage match { + case buffer: Stages.Buffer[_] => + // ignore the buffering operation + identity("buffer", conf) + } + } + } + +} + +object RemoteMaterializerImpl { + final val NotApplied: Any => Any = _ => NotApplied + + def collectOp[In, Out](collect: PartialFunction[In, Out], conf: UserConfig): Op = { + flatMapOp({ data: In => + collect.applyOrElse(data, NotApplied) match { + case NotApplied => None + case result: Any => Option(result) + } + }, "collect", conf) + } + + def filterOp[In](filter: In => Boolean, conf: UserConfig): Op = { + flatMapOp({ data: In => + if (filter(data)) Option(data) else None + }, "filter", conf) + } + + def headOptionOp[T](headOptionStage: HeadOptionStage[T], conf: UserConfig): Op = { + val promise: Promise[Option[T]] = Promise() + flatMapOp({ data: T => + data match { + case None => + Some(promise.future.failed) + case Some(d) => + promise.future.value + } + }, "headOption", conf) + } + + def reduceOp[T](reduce: (T, T) => T, conf: UserConfig): Op = { + var result: Option[T] = None + val flatMap = { elem: T => + result match { + case None => + result = Some(elem) + case Some(r) => + result = Some(reduce(r, elem)) + } + List(result) + } + flatMapOp(flatMap, "reduce", conf) + } + + def zipWithOp[In1, In2](zipWith: (In1, In2) => (In1, In2), conf: UserConfig): Op = { + val flatMap = { elem: (In1, In2) => + val (e1, e2) = elem + val result: (In1, In2) = zipWith(e1, e2) + List(result) + } + flatMapOp(flatMap, "zipWith", conf) + } + + def zipWithOp2[In1, In2, Out](zipWith: (In1, In2) => Out, conf: UserConfig): Op = { + val flatMap = { elem: (In1, In2) => + val (e1, e2) = elem + val result: Out = zipWith(e1, e2) + List(result) + } + flatMapOp(flatMap, "zipWith", conf) + } + + def identity(description: String, conf: UserConfig): Op = { + flatMapOp({ data: Any => + List(data) + }, description, conf) + } + + def mapOp[In, Out](map: In => Out, conf: UserConfig): Op = { + val flatMap = (data: In) => List(map(data)) + flatMapOp (flatMap, conf) + } + + def flatMapOp[In, Out](flatMap: In => Iterable[Out], conf: UserConfig): Op = { + flatMapOp(flatMap, "flatmap", conf) + } + + def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String, + conf: UserConfig): Op = { + FlatMapOp(fun, description, conf) + } + + def conflatOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out, + conf: UserConfig): Op = { + var agg = None: Option[Out] + val flatMap = {elem: In => + agg = agg match { + case None => + Some(seed(elem)) + case Some(value) => + Some(aggregate(value, elem)) + } + List(agg.get) + } + flatMapOp (flatMap, "conflat", conf) + } + + def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = { + var aggregator: Out = zero + val map = { elem: In => + aggregator = fold(aggregator, elem) + List(aggregator) + } + flatMapOp(map, "fold", conf) + } + + def groupedOp(count: Int, conf: UserConfig): Op = { + var left = count + val buf = { + val b = Vector.newBuilder[Any] + b.sizeHint(count) + b + } + + val flatMap: Any => Iterable[Any] = {input: Any => + buf += input + left -= 1 + if (left == 0) { + val emit = buf.result() + buf.clear() + left = count + Some(emit) + } else { + None + } + } + flatMapOp(flatMap, conf: UserConfig) + } + + def dropOp[T](number: Long, conf: UserConfig): Op = { + var left = number + val flatMap: T => Iterable[T] = {input: T => + if (left > 0) { + left -= 1 + None + } else { + Some(input) + } + } + flatMapOp(flatMap, "drop", conf) + } + + def dropWhileOp[In](drop: In => Boolean, conf: UserConfig): Op = { + flatMapOp({ data: In => + if (drop(data)) None else Option(data) + }, "dropWhile", conf) + } + + def logOp[T](name: String, extract: T => Any, conf: UserConfig): Op = { + val flatMap = {elem: T => + LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}") + List(elem) + } + flatMapOp(flatMap, "log", conf) + } + + def scanOp[In, Out](zero: Out, f: (Out, In) => Out, conf: UserConfig): Op = { + var aggregator = zero + var pushedZero = false + + val flatMap = {elem: In => + aggregator = f(aggregator, elem) + + if (pushedZero) { + pushedZero = true + List(zero, aggregator) + } else { + List(aggregator) + } + } + flatMapOp(flatMap, "scan", conf) + } + + def statefulMapOp[In, Out](f: In => Iterable[Out], conf: UserConfig): Op = { + flatMapOp ({ data: In => + f(data) + }, conf) + } + + def takeOp(count: Long, conf: UserConfig): Op = { + var left: Long = count + + val filter: Any => Iterable[Any] = {elem: Any => + left -= 1 + if (left > 0) Some(elem) + else if (left == 0) Some(elem) + else None + } + flatMapOp(filter, "take", conf) + } + + /** + * We use this attribute to track module to Processor + * + */ + val TRACKABLE = "track how module is fused to processor" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala new file mode 100644 index 0000000..35d0e88 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream._ +import akka.stream.impl.StreamLayout.{AtomicModule, Module} +import org.reactivestreams.{Publisher, Subscriber} + +/** + * + * + * [[IN]] -> [[BridgeModule]] -> [[OUT]] + * / + * / + * out of band data input or output channel [[MAT]] + * + * + * [[BridgeModule]] is used as a bridge between different materializers. + * Different [[akka.stream.Materializer]]s can use out of band channel to + * exchange messages. + * + * For example: + * + * Remote Materializer + * ----------------------------- + * | | + * | BridgeModule -> RemoteSink | + * | / | + * --/---------------------------- + * Local Materializer / out of band channel. + * ----------------------/---- + * | Local / | + * | Source -> BridgeModule | + * | | + * --------------------------- + * + * + * Typically [[BridgeModule]] is created implicitly as a temporary intermediate + * module during materialization. + * + * However, user can still declare it explicitly. In this case, it means we have a + * boundary Source or Sink module which accept out of band channel inputs or + * outputs. + * + * @tparam IN input + * @tparam OUT output + * @tparam MAT materializer + */ +abstract class BridgeModule[IN, OUT, MAT] extends AtomicModule { + val inPort = Inlet[IN]("BridgeModule.in") + val outPort = Outlet[OUT]("BridgeModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + def attributes: Attributes + def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT] + + protected def newInstance: BridgeModule[IN, OUT, MAT] + override def carbonCopy: Module = newInstance +} + + +/** + * + * Bridge module which accept out of band channel Input + * [[org.reactivestreams.Publisher]][IN]. + * + * + * [[SourceBridgeModule]] -> [[OUT]] + * /| + * / + * out of band data input [[org.reactivestreams.Publisher]][IN] + * + * @see [[BridgeModule]] + * @param attributes Attributes + * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]] + * @tparam OUT out put data type to next module. + */ +class SourceBridgeModule[IN, OUT](val attributes: Attributes = + Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] { + override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = + new SourceBridgeModule[IN, OUT](attributes) + + override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = { + new SourceBridgeModule( attributes) + } +} + +/** + * + * Bridge module which accept out of band channel Output + * [[org.reactivestreams.Subscriber]][OUT]. + * + * + * [[IN]] -> [[BridgeModule]] + * \ + * \ + * \| + * out of band data output [[org.reactivestreams.Subscriber]][OUT] + * + * @see [[BridgeModule]] + * @param attributes Attributes + * @tparam IN, input data type from previous module + * @tparam OUT out put data type to out of band subscriber + */ +class SinkBridgeModule[IN, OUT](val attributes: Attributes = + Attributes.name("sinkBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] { + override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = + new SinkBridgeModule[IN, OUT](attributes) + + override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = { + new SinkBridgeModule[IN, OUT](attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala new file mode 100644 index 0000000..2c430d5 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream.impl.StreamLayout.{AtomicModule, Module} +import akka.stream.impl.{SinkModule, SourceModule} +import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape} +import org.reactivestreams.{Publisher, Subscriber} + +/** + * [[DummyModule]] is a set of special module to help construct a RunnableGraph, + * so that all ports are closed. + * + * In runtime, [[DummyModule]] should be ignored during materialization. + * + * For example, if you have a [[BridgeModule]] which only accept the input + * message from out of band channel, then you can use DummySource to fake + * a Message Source Like this. + * + * [[DummySource]] -> [[BridgeModule]] -> Sink + * /| + * / + * out of band input message [[Publisher]] + * + * After materialization, [[DummySource]] will be removed. + + * [[BridgeModule]] -> Sink + * /| + * / + * [[akka.stream.impl.PublisherSource]] + * + * + */ +trait DummyModule extends AtomicModule + + +/** + * + * [[DummySource]]-> [[BridgeModule]] -> Sink + * /| + * / + * out of band input message Source + * + * @param attributes Attributes + * @param shape SourceShape[Out] + * @tparam Out Output + */ +class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out]) + extends SourceModule[Out, Unit](shape) with DummyModule { + + override def create(context: MaterializationContext): (Publisher[Out], Unit) = { + throw new UnsupportedOperationException() + } + + override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = { + new DummySource[Out](attributes, shape) + } + + override def withAttributes(attr: Attributes): Module = { + new DummySource(attr, amendShape(attr)) + } +} + + +/** + * + * Source-> [[BridgeModule]] -> [[DummySink]] + * \ + * \ + * \| + * out of band output message [[Subscriber]] + * + * @param attributes Attributes + * @param shape SinkShape[IN] + */ +class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN]) + extends SinkModule[IN, Unit](shape) with DummyModule { + override def create(context: MaterializationContext): (Subscriber[IN], Unit) = { + throw new UnsupportedOperationException() + } + + override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, Unit] = { + new DummySink[IN](attributes, shape) + } + + override def withAttributes(attr: Attributes): Module = { + new DummySink[IN](attr, amendShape(attr)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala new file mode 100644 index 0000000..7555244 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream.impl.StreamLayout.{AtomicModule, Module} +import akka.stream._ +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task + +/** + * [[GearpumpTaskModule]] represent modules that can be materialized as Gearpump Tasks. + * + * This is specially designed for Gearpump runtime. It is not supposed to be used + * for local materializer. + * + */ +trait GearpumpTaskModule extends AtomicModule + +/** + * This is used to represent the Gearpump Data Source + * @param source DataSource + * @param conf UserConfig + * @param shape SourceShape[T} + * @param attributes Attributes + * @tparam T type + */ +final case class SourceTaskModule[T]( + source: DataSource, + conf: UserConfig, + shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")), + attributes: Attributes = Attributes.name("SourceTaskModule")) + extends GearpumpTaskModule { + + override def withAttributes(attr: Attributes): Module = + this.copy(shape = amendShape(attr), attributes = attr) + override def carbonCopy: Module = + this.copy(shape = SourceShape( Outlet[T]("SourceTaskModule.out"))) + + override def replaceShape(s: Shape): Module = + if (s == shape) this + else throw new UnsupportedOperationException("cannot replace the shape of SourceTaskModule") + + private def amendShape(attr: Attributes): SourceShape[T] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(out = Outlet(thatN + ".out")) + } +} + +/** + * This is used to represent the Gearpump Data Sink + * @param sink DataSink + * @param conf UserConfig + * @param shape SinkShape[IN] + * @param attributes Attributes + * @tparam IN type + */ +final case class SinkTaskModule[IN]( + sink: DataSink, + conf: UserConfig, + shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")), + attributes: Attributes = Attributes.name("SinkTaskModule")) + extends GearpumpTaskModule { + + override def withAttributes(attr: Attributes): Module = + this.copy(shape = amendShape(attr), attributes = attr) + override def carbonCopy: Module = + this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in"))) + + override def replaceShape(s: Shape): Module = + if (s == shape) this + else throw new UnsupportedOperationException("cannot replace the shape of SinkTaskModule") + + private def amendShape(attr: Attributes): SinkShape[IN] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(in = Inlet(thatN + ".out")) + } +} + +/** + * This is to represent the Gearpump Processor which has exact one input and one output + * @param processor Class[_ <: Task] + * @param conf UserConfig + * @param attributes Attributes + * @tparam IN type + * @tparam OUT type + * @tparam Unit void + */ +case class ProcessorModule[IN, OUT, Unit]( + processor: Class[_ <: Task], + conf: UserConfig, + attributes: Attributes = Attributes.name("processorModule")) + extends AtomicModule with GearpumpTaskModule { + val inPort = Inlet[IN]("ProcessorModule.in") + val outPort = Outlet[IN]("ProcessorModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + override def carbonCopy: Module = newInstance + + protected def newInstance: ProcessorModule[IN, OUT, Unit] = + new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) + + override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = { + new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala new file mode 100644 index 0000000..4465886 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream._ +import akka.stream.impl.StreamLayout.{AtomicModule, Module} + + +/** + * + * Group the T value groupBy function + * + * @param groupBy T => Group + * @param attributes Attributes + * @tparam T type + * @tparam Group type + */ +case class GroupByModule[T, Group](groupBy: T => Group, + attributes: Attributes = Attributes.name("groupByModule")) + extends AtomicModule { + val inPort = Inlet[T]("GroupByModule.in") + val outPort = Outlet[T]("GroupByModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + override def carbonCopy: Module = newInstance + + protected def newInstance: GroupByModule[T, Group] = + new GroupByModule[T, Group](groupBy, attributes) + + override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = { + new GroupByModule[T, Group](groupBy, attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala new file mode 100644 index 0000000..295556f --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.module + +import akka.stream._ +import akka.stream.impl.StreamLayout.{AtomicModule, Module} + + +/** + * + * Reduce Module + * + * @param f (T,T) => T + * @param attributes Attributes + * @tparam T type + */ +case class ReduceModule[T](f: (T, T) => T, attributes: Attributes = +Attributes.name("reduceModule")) extends AtomicModule { + val inPort = Inlet[T]("GroupByModule.in") + val outPort = Outlet[T]("GroupByModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape): Module = if (s != shape) { + throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + } else { + this + } + + override def carbonCopy: Module = newInstance + + protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, attributes) + + override def withAttributes(attributes: Attributes): ReduceModule[T] = { + new ReduceModule[T](f, attributes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala new file mode 100644 index 0000000..85b1d5e --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.scaladsl + +import akka.stream.Attributes +import org.apache.gearpump.akkastream.module._ +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task +import org.reactivestreams.{Publisher, Subscriber} + + +object GearSource{ + + /** + * Construct a Source which accepts out of band input messages. + * + * [[SourceBridgeModule]] -> Sink + * / + * / + * V + * materialize to [[Subscriber]] + * /| + * / + * upstream [[Publisher]] send out of band message + * + */ + def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = { + val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy"))) + val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]()) + source.viaMat(flow)(Keep.right) + } + + /** + * Construct a Source from Gearpump [[DataSource]]. + * + * [[SourceTaskModule]] -> downstream Sink + * + */ + def from[OUT](source: DataSource): Source[OUT, Unit] = { + val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty)) + taskSource + } + + /** + * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]]. + * + * [[ProcessorModule]] -> downstream Sink + * + */ + def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = { + val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy"))) + val flow = Processor.apply[Unit, OUT](processor, conf) + source.viaMat(flow)(Keep.right) + } +} + +object GearSink { + + /** + * Construct a Sink which output messages to a out of band channel. + * + * Souce -> [[SinkBridgeModule]] + * \ + * \| + * materialize to [[Publisher]] + * \ + * \ + * \| + * send out of band message to downstream [[Subscriber]] + * + */ + def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = { + val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy"))) + val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]()) + flow.to(sink) + } + + /** + * Construct a Sink from Gearpump [[DataSink]]. + * + * Upstream Source -> [[SinkTaskModule]] + * + */ + def to[IN](sink: DataSink): Sink[IN, Unit] = { + val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty)) + taskSink + } + + /** + * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]]. + * + * Upstream Source -> [[ProcessorModule]] + * + */ + def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = { + val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy"))) + val flow = Processor.apply[IN, Unit](processor, conf) + flow.to(sink) + } +} + +/** + * + * GroupBy will divide the main input stream to a set of sub-streams. + * This is a work-around to bypass the limitation of official API Flow.groupBy + * + * + * For example, to do a word count, we can write code like this: + * + * case class KV(key: String, value: String) + * case class Count(key: String, count: Int) + * + * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv => + * Count(kv.key, 1) + * }.fold(Count(null, 0)) {(base, add) => + * Count(add.key, base.count + add.count) + * }.log("count of current key") + * .flatten() + * .to(sink) + * + * map, fold will transform data on all sub-streams, If there are 10 groups, + * then there will be 10 sub-streams, and for each sub-stream, there will be + * a map and fold. + * + * flatten will collect all sub-stream into the main stream, + * + * sink will only operate on the main stream. + * + */ +object GroupBy{ + def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = { + new Flow[T, T, Unit](new GroupByModule(groupBy)) + } +} + +/** + * Aggregate on the data. + * + * val flow = Reduce({(a: Int, b: Int) => a + b}) + * + * + */ +object Reduce{ + def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = { + new Flow[T, T, Unit](new ReduceModule(reduce)) + } +} + + +/** + * Create a Flow by providing a Gearpump Processor class and configuration + * + * + */ +object Processor{ + def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = { + new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf)) + } +} + +object Implicits { + + /** + * Help util to support reduce and groupBy + */ + implicit class SourceOps[T, Mat](source: Source[T, Mat]) { + + // TODO It is named as groupBy2 to avoid conflict with built-in + // groupBy. Eventually, we think the built-in groupBy should + // be replace with this implementation. + def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = { + val stage = GroupBy.apply(groupBy) + source.via[T, Unit](stage) + } + + + def reduce(reduce: (T, T) => T): Source[T, Mat] = { + val stage = Reduce.apply(reduce) + source.via[T, Unit](stage) + } + + def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = { + val stage = Processor.apply[T, R](processor, conf) + source.via(stage) + } + } + + /** + * Help util to support reduce and groupBy + */ + implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) { + def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = { + val stage = GroupBy.apply(groupBy) + flow.via(stage) + } + + def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = { + val stage = Reduce.apply(reduce) + flow.via(stage) + } + + def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = { + val stage = Processor.apply[OUT, R](processor, conf) + flow.via(stage) + } + } + + /** + * Help util to support groupByKey and sum + */ + implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) { + + /** + * if it is a KV Pair, we can group the KV pair by the key. + * @return + */ + def groupByKey: Source[(K, V), Mat] = { + val stage = GroupBy.apply(getTupleKey[K, V]) + source.via(stage) + } + + /** + * do sum on values + * + * Before doing this, you need to do groupByKey to group same key together + * , otherwise, it will do the sum no matter what current key is. + * + * @param numeric Numeric[V] + * @return + */ + def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = { + val stage = Reduce.apply(sumByKey[K, V](numeric)) + source.via(stage) + } + } + + /** + * Help util to support groupByKey and sum + */ + implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) { + + /** + * if it is a KV Pair, we can group the KV pair by the key. + * @return + */ + def groupByKey: Flow[(K, V), (K, V), Mat] = { + val stage = GroupBy.apply(getTupleKey[K, V]) + flow.via(stage) + } + + /** + * do sum on values + * + * Before doing this, you need to do groupByKey to group same key together + * , otherwise, it will do the sum no matter what current key is. + * + * @param numeric Numeric[V] + * @return + */ + def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = { + val stage = Reduce.apply(sumByKey[K, V](numeric)) + flow.via(stage) + } + } + + private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 + + private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = + (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala new file mode 100644 index 0000000..5139117 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class BalanceTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfOutputs = sizeOfOutPorts + var index = 0 + + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfOutputs) { + index = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala new file mode 100644 index 0000000..582327b --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +class BatchTask[In, Out](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val max = userConf.getLong(BatchTask.MAX) + val costFunc = userConf.getValue[In => Long](BatchTask.COST) + val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE) + val seed = userConf.getValue[In => Out](BatchTask.SEED) + + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[In] + val time = msg.timestamp + context.output(msg) + } +} + +object BatchTask { + val AGGREGATE = "AGGREGATE" + val COST = "COST" + val MAX = "MAX" + val SEED = "SEED" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala new file mode 100644 index 0000000..9f1194f --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class BroadcastTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + override def onNext(msg : Message) : Unit = { + context.output(msg) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala new file mode 100644 index 0000000..241fa76 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class ConcatTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfOutputs = sizeOfOutPorts + var index = 0 + + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfOutputs) { + index = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala new file mode 100644 index 0000000..d6c347a --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +case object DelayInitialTime + +class DelayInitialTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val delayInitial = userConf.getValue[FiniteDuration](DelayInitialTask.DELAY_INITIAL). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + var delayInitialActive = true + + override def onStart(startTime: Instant): Unit = { + context.scheduleOnce(delayInitial)( + self ! Message(DelayInitialTime, System.currentTimeMillis()) + ) + } + override def onNext(msg : Message) : Unit = { + msg.msg match { + case DelayInitialTime => + delayInitialActive = false + case _ => + delayInitialActive match { + case true => + case false => + context.output(msg) + } + } + } +} + +object DelayInitialTask { + val DELAY_INITIAL = "DELAY_INITIAL" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala new file mode 100644 index 0000000..9da26b1 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +import scala.concurrent.duration.FiniteDuration + +case object DropWithinTimeout + +class DropWithinTask[T](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val timeout = userConf.getValue[FiniteDuration](DropWithinTask.TIMEOUT). + getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) + var timeoutActive = true + + override def onStart(startTime: Instant): Unit = { + context.scheduleOnce(timeout)( + self ! Message(DropWithinTimeout, System.currentTimeMillis()) + ) + } + + override def onNext(msg : Message) : Unit = { + msg.msg match { + case DropWithinTimeout => + timeoutActive = false + case _ => + + } + timeoutActive match { + case true => + case false => + context.output(msg) + } + } +} + +object DropWithinTask { + val TIMEOUT = "TIMEOUT" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala new file mode 100644 index 0000000..512164d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class FlattenMergeTask(context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val sizeOfOutputs = sizeOfOutPorts + var index = 0 + + override def onNext(msg : Message) : Unit = { + output(index, msg) + index += 1 + if (index == sizeOfOutputs) { + index = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala new file mode 100644 index 0000000..e2f02d8 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class FoldTask[In, Out](context: TaskContext, userConf : UserConfig) + extends GraphTask(context, userConf) { + + val zero = userConf.getValue[Out](FoldTask.ZERO) + val aggregator = userConf.getValue[(Out, In) => Out](FoldTask.AGGREGATOR) + var aggregated: Out = _ + implicit val ec = context.system.dispatcher + + override def onStart(instant: Instant): Unit = { + zero.foreach(value => { + aggregated = value + }) + } + + override def onNext(msg : Message) : Unit = { + val data = msg.msg.asInstanceOf[In] + val time = msg.timestamp + aggregator.foreach(func => { + aggregated = func(aggregated, data) + LOG.info(s"aggregated = $aggregated") + val msg = new Message(aggregated, time) + context.output(msg) + }) + } +} + +object FoldTask { + val ZERO = "ZERO" + val AGGREGATOR = "AGGREGATOR" +}
