Fixes #22 support akka-streams Gearpump Materializer Author: Kam Kasravi <[email protected]>
Closes #94 from kkasravi/GEARPUMP-22. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/4fe5458f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/4fe5458f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/4fe5458f Branch: refs/heads/akka-streams Commit: 4fe5458f45e1f1a9da84704d7c4d7eaff3b625bb Parents: 5c4d60c Author: Kam Kasravi <[email protected]> Authored: Tue Oct 11 10:32:04 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Oct 11 10:32:04 2016 +0800 ---------------------------------------------------------------------- .../apache/gearpump/cluster/UserConfig.scala | 21 +- .../cluster/master/AppMasterLauncher.scala | 15 +- experiments/akkastream/README.md | 4 +- .../src/main/resources/geardefault.conf | 5 +- .../scala/akka/stream/BaseMaterializer.scala | 47 -- .../main/scala/akka/stream/ModuleGraph.scala | 298 ---------- .../akka/stream/gearpump/GearAttributes.scala | 90 --- .../stream/gearpump/GearpumpMaterializer.scala | 71 --- .../akka/stream/gearpump/example/Test.scala | 69 --- .../akka/stream/gearpump/example/Test2.scala | 71 --- .../akka/stream/gearpump/example/Test3.scala | 59 -- .../akka/stream/gearpump/example/Test4.scala | 50 -- .../akka/stream/gearpump/example/Test5.scala | 68 --- .../akka/stream/gearpump/example/Test6.scala | 67 --- .../stream/gearpump/example/WikipediaApp.scala | 143 ----- .../stream/gearpump/graph/GraphCutter.scala | 183 ------ .../akka/stream/gearpump/graph/LocalGraph.scala | 81 --- .../stream/gearpump/graph/RemoteGraph.scala | 106 ---- .../akka/stream/gearpump/graph/SubGraph.scala | 56 -- .../materializer/LocalMaterializer.scala | 152 ----- .../materializer/LocalMaterializerImpl.scala | 284 --------- .../materializer/RemoteMaterializerImpl.scala | 453 -------------- .../stream/gearpump/module/BridgeModule.scala | 124 ---- .../stream/gearpump/module/DummyModule.scala | 103 ---- .../gearpump/module/GearpumpTaskModule.scala | 133 ----- .../stream/gearpump/module/GroupByModule.scala | 46 -- .../stream/gearpump/module/ReduceModule.scala | 44 -- .../akka/stream/gearpump/scaladsl/Api.scala | 282 --------- .../akka/stream/gearpump/task/BalanceTask.scala | 37 -- .../stream/gearpump/task/BroadcastTask.scala | 29 - .../akka/stream/gearpump/task/GraphTask.scala | 70 --- .../stream/gearpump/task/SinkBridgeTask.scala | 125 ---- .../stream/gearpump/task/SourceBridgeTask.scala | 107 ---- .../akka/stream/gearpump/task/UnZip2Task.scala | 45 -- .../gearpump/util/MaterializedValueOps.scala | 38 -- .../gearpump/akkastream/GearAttributes.scala | 89 +++ .../akkastream/GearpumpMaterializer.scala | 290 +++++++++ .../GearpumpMaterializerSession.scala | 152 +++++ .../gearpump/akkastream/example/Test.scala | 61 ++ .../gearpump/akkastream/example/Test10.scala | 82 +++ .../gearpump/akkastream/example/Test11.scala | 72 +++ .../gearpump/akkastream/example/Test12.scala | 81 +++ .../gearpump/akkastream/example/Test13.scala | 177 ++++++ .../gearpump/akkastream/example/Test14.scala | 73 +++ .../gearpump/akkastream/example/Test15.scala | 72 +++ .../gearpump/akkastream/example/Test16.scala | 50 ++ .../gearpump/akkastream/example/Test2.scala | 77 +++ .../gearpump/akkastream/example/Test3.scala | 70 +++ .../gearpump/akkastream/example/Test4.scala | 50 ++ .../gearpump/akkastream/example/Test5.scala | 67 +++ .../gearpump/akkastream/example/Test6.scala | 90 +++ .../gearpump/akkastream/example/Test7.scala | 56 ++ .../gearpump/akkastream/example/Test8.scala | 66 +++ .../gearpump/akkastream/example/Test9.scala | 87 +++ .../akkastream/example/WikipediaApp.scala | 159 +++++ .../akkastream/graph/GraphPartitioner.scala | 205 +++++++ .../gearpump/akkastream/graph/LocalGraph.scala | 80 +++ .../gearpump/akkastream/graph/RemoteGraph.scala | 113 ++++ .../gearpump/akkastream/graph/SubGraph.scala | 59 ++ .../materializer/LocalMaterializerImpl.scala | 319 ++++++++++ .../materializer/RemoteMaterializerImpl.scala | 594 +++++++++++++++++++ .../akkastream/module/BridgeModule.scala | 135 +++++ .../akkastream/module/DummyModule.scala | 105 ++++ .../akkastream/module/GearpumpTaskModule.scala | 135 +++++ .../akkastream/module/GroupByModule.scala | 55 ++ .../akkastream/module/ReduceModule.scala | 52 ++ .../gearpump/akkastream/scaladsl/Api.scala | 289 +++++++++ .../gearpump/akkastream/task/BalanceTask.scala | 38 ++ .../gearpump/akkastream/task/BatchTask.scala | 50 ++ .../akkastream/task/BroadcastTask.scala | 30 + .../gearpump/akkastream/task/ConcatTask.scala | 38 ++ .../akkastream/task/DelayInitialTask.scala | 61 ++ .../akkastream/task/DropWithinTask.scala | 62 ++ .../akkastream/task/FlattenMergeTask.scala | 38 ++ .../gearpump/akkastream/task/FoldTask.scala | 56 ++ .../gearpump/akkastream/task/GraphTask.scala | 71 +++ .../akkastream/task/GroupedWithinTask.scala | 44 ++ .../akkastream/task/InterleaveTask.scala | 44 ++ .../gearpump/akkastream/task/MapAsyncTask.scala | 53 ++ .../gearpump/akkastream/task/MergeTask.scala | 39 ++ .../akkastream/task/SingleSourceTask.scala | 43 ++ .../akkastream/task/SinkBridgeTask.scala | 131 ++++ .../akkastream/task/SourceBridgeTask.scala | 116 ++++ .../akkastream/task/StatefulMapConcatTask.scala | 50 ++ .../akkastream/task/TakeWithinTask.scala | 62 ++ .../gearpump/akkastream/task/ThrottleTask.scala | 53 ++ .../akkastream/task/TickSourceTask.scala | 56 ++ .../gearpump/akkastream/task/Unzip2Task.scala | 46 ++ .../gearpump/akkastream/task/Zip2Task.scala | 57 ++ .../akkastream/util/MaterializedValueOps.scala | 40 ++ .../akka/stream/gearpump/AttributesSpec.scala | 33 -- .../gearpump/akkastream/AttributesSpec.scala | 34 ++ project/Build.scala | 35 +- project/Pack.scala | 16 +- project/scalastyle_config.xml | 240 ++++++++ scalastyle-config.xml | 240 -------- .../gearpump/services/AppMasterService.scala | 2 +- .../gearpump/services/MasterService.scala | 2 +- .../gearpump/services/SecurityService.scala | 4 +- .../gearpump/services/StaticService.scala | 8 +- .../gearpump/services/WorkerService.scala | 2 +- .../streaming/dsl/plan/OpTranslator.scala | 2 +- .../gearpump/streaming/task/TaskActor.scala | 13 +- 103 files changed, 5694 insertions(+), 3853 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala index 3f42808..393d5f7 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala @@ -20,13 +20,16 @@ package org.apache.gearpump.cluster import akka.actor.{ActorSystem, ExtendedActorSystem} import akka.serialization.JavaSerializer - import org.apache.gearpump.google.common.io.BaseEncoding +import org.apache.gearpump.util.LogUtil + +import scala.util.{Failure, Success, Try} /** * Immutable configuration */ final class UserConfig(private val _config: Map[String, String]) extends Serializable { + private val LOG = LogUtil.getLogger(getClass) def withBoolean(key: String, value: Boolean): UserConfig = { new UserConfig(_config + (key -> value.toString)) @@ -135,10 +138,18 @@ final class UserConfig(private val _config: Map[String, String]) extends Seriali if (null == value) { this } else { - val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) - val bytes = serializer.toBinary(value) - val encoded = BaseEncoding.base64().encode(bytes) - this.withString(key, encoded) + Try({ + val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) + val bytes = serializer.toBinary(value) + BaseEncoding.base64().encode(bytes) + }) match { + case Success(enc) => + this.withString(key, enc) + case Failure(throwable) => + LOG.error(s"Could not serialize value with key $key ${throwable.getMessage}") + this + } + } } // scalastyle:on line.size.limit http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala index 9305d5c..de104b9 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala @@ -19,16 +19,9 @@ package org.apache.gearpump.cluster.master import java.util.concurrent.{TimeUnit, TimeoutException} -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success} import akka.actor.{Actor, ActorRef, Props, _} import com.typesafe.config.Config -import org.slf4j.Logger - import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated @@ -36,11 +29,17 @@ import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo} import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppDescription, AppJar, _} import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.ActorSystemBooter._ import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} /** * @@ -145,4 +144,4 @@ object AppMasterLauncher extends AppMasterLauncherFactory { trait AppMasterLauncherFactory { def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef]): Props -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/README.md ---------------------------------------------------------------------- diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md index 7c9a316..fe04554 100644 --- a/experiments/akkastream/README.md +++ b/experiments/akkastream/README.md @@ -1,4 +1,2 @@ Akka Stream -========= - -TODO: This directory is obsolte. Working on updating it to Akka 2.4.3. +========= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf index 626a1dc..e9da531 100644 --- a/experiments/akkastream/src/main/resources/geardefault.conf +++ b/experiments/akkastream/src/main/resources/geardefault.conf @@ -2,4 +2,7 @@ gearpump.serializers { "akka.stream.gearpump.example.WikipediaApp$WikidataElement" = "" "scala.collection.immutable.Map$Map1" = "" "scala.collection.immutable.Map$Map2" = "" -} \ No newline at end of file +} +akka { + version = "2.4.10" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala deleted file mode 100644 index d2b328d..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream - -import scala.concurrent.ExecutionContextExecutor - -/** - * [[BaseMaterializer]] is a extension to [[akka.stream.Materializer]]. - * - * Compared with [[akka.stream.Materializer]], the difference is that - * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph. - * - * @see [[ModuleGraph]] for the difference between RunnableGraph and - * [[ModuleGraph]] - * - */ -abstract class BaseMaterializer extends akka.stream.Materializer { - - override def withNamePrefix(name: String): Materializer = throw new UnsupportedOperationException() - - override implicit def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException() - - def materialize[Mat](graph: ModuleGraph[Mat]): Mat - - override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { - val graph = ModuleGraph(runnableGraph) - materialize(graph) - } - - def shutdown(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala deleted file mode 100644 index 48d06f7..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This code is similar to MaterializerSession and will be deprecated in the next release. - */ - -package akka.stream - -import scala.collection.mutable - -import akka.stream.Attributes.Attribute -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.util.MaterializedValueOps -import akka.stream.impl.StreamLayout._ -import akka.stream.impl._ -import akka.stream.{Graph => AkkaGraph} - -import _root_.org.apache.gearpump.util -import _root_.org.apache.gearpump.util.Graph - -/** - * - * ModuleGraph is a transformation on [[akka.stream.scaladsl.RunnableGraph]]. - * It carries all the information of [[akka.stream.scaladsl.RunnableGraph]], but - * represents it in a different way. - * - * Here is the difference: - * - * RunnableGraph - * ============================== - * [[akka.stream.scaladsl.RunnableGraph]] is represented as a [[Module]] tree: - * TopLevelModule - * | - * ------------------- - * | | - * SubModule1 SubModule2 - * | | - * ---------------- ---------- - * | | | - * AtomicModule1 AtomicModule2 AtomicModule3 - * - * ModuleGraph - * ============================== - * [[ModuleGraph]] is represented as a [[util.Graph]] of Atomic [[Module]]: - * - * AtomicModule2 -> AtomicModule4 - * /| \ - * / \ - * / \| - * AtomicModule1 AtomicModule5 - * \ /| - * \ / - * \| / - * AtomicModule3 - * - * Each vertex in the Graph is a [[Module]], each [[Edge]] in the Graph is a tuple - * ([[OutPort]], [[InPort]]). [[OutPort]] is one of upstream Atomic Module - * output ports. [[InPort]] is one of downstream Atomic Module input ports. - * - * - * Why use [[ModuleGraph]] instead of [[akka.stream.scaladsl.RunnableGraph]]? - * ========================= - * There are several good reasons:): - * 1. [[ModuleGraph]] outlines explicitly the upstream/downstream relation. - * Each [[Edge]] of [[ModuleGraph]] represent a upstream/downstream pair. - * It is easier for user to understand the overall data flow. - * - * 2. It is easier for performance optimization. - * For the above Graph, if we want to fuse AtomicModule2 and AtomicModule3 - * together, it can be done within [[ModuleGraph]]. We only need - * to substitute Pair(AtomicModule2, AtomicModule4) with a unified Module. - * - * 3. It avoids module duplication. - * In [[akka.stream.scaladsl.RunnableGraph]], composite Module can be re-used. - * It is possible that there can be duplicate Modules. - * The duplication problem causes big headache when doing materialization. - * - * [[ModuleGraph]] doesn't have thjis problem. [[ModuleGraph]] does a transformation on the Module - * Tree to make sure each Atomic Module [[ModuleGraph]] is unique. - * - * - * @param graph a Graph of Atomic modules. - * @param mat is a function of: - * input => materialized value of each Atomic module - * output => final materialized value. - * @tparam Mat - */ -class ModuleGraph[Mat](val graph: util.Graph[Module, Edge], val mat: MaterializedValueNode) { - - def resolve(materializedValues: Map[Module, Any]): Mat = { - MaterializedValueOps(mat).resolve[Mat](materializedValues) - } -} - -object ModuleGraph { - - def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] = { - val topLevel = runnableGraph.module - val factory = new ModuleGraphFactory(topLevel) - val (graph, mat) = factory.create() - new ModuleGraph(graph, mat) - } - - /** - * - * @param from outport of upstream module - * @param to inport of downstream module - */ - case class Edge(from: OutPort, to: InPort) - - private class ModuleGraphFactory(val topLevel: StreamLayout.Module) { - - private var subscribersStack: List[mutable.Map[InPort, (InPort, Module)]] = - mutable.Map.empty[InPort, (InPort, Module)].withDefaultValue(null) :: Nil - private var publishersStack: List[mutable.Map[OutPort, (OutPort, Module)]] = - mutable.Map.empty[OutPort, (OutPort, Module)].withDefaultValue(null) :: Nil - - /* - * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule - * itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but - * not elsewhere. For this reason they are just simply passed as parameters to those methods. - * - * The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class - * from passing the current scope around or even knowing about it. - */ - private var moduleStack: List[Module] = topLevel :: Nil - - private def subscribers: mutable.Map[InPort, (InPort, Module)] = subscribersStack.head - private def publishers: mutable.Map[OutPort, (OutPort, Module)] = publishersStack.head - private def currentLayout: Module = moduleStack.head - - private val graph = Graph.empty[Module, Edge] - - private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = { - val currentAttributes = mergeAttributes(parentAttributes, module.attributes) - module.withAttributes(currentAttributes).asInstanceOf[T] - } - - private def materializeAtomic(atomic: Module, parentAttributes: Attributes): MaterializedValueNode = { - val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets) - val copied = copyAtomicModule(atomic, parentAttributes) - - for ((in, id) <- inputs.zipWithIndex) { - val inPort = inPortMapping(atomic, copied)(in) - assignPort(in, (inPort, copied)) - } - - for ((out, id) <- outputs.zipWithIndex) { - val outPort = outPortMapping(atomic, copied)(out) - assignPort(out, (outPort, copied)) - } - - graph.addVertex(copied) - Atomic(copied) - } - - def create(): (util.Graph[Module, Edge], MaterializedValueNode) = { - val mat = materializeModule(topLevel, Attributes.none) - (graph, mat) - } - - private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = { - from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap - } - - private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = { - from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap - } - - private def materializeModule(module: Module, parentAttributes: Attributes): MaterializedValueNode = { - - val materializedValues = collection.mutable.HashMap.empty[Module, MaterializedValueNode] - val currentAttributes = mergeAttributes(parentAttributes, module.attributes) - - var materializedValueSources = List.empty[MaterializedValueSource[_]] - - for (submodule <- module.subModules) { - submodule match { - case mv: MaterializedValueSource[_] => - materializedValueSources :+= mv - case atomic if atomic.isAtomic => - materializedValues.put(atomic, materializeAtomic(atomic, currentAttributes)) - case copied: CopiedModule => - enterScope(copied) - materializedValues.put(copied, materializeModule(copied, currentAttributes)) - exitScope(copied) - case composite => - materializedValues.put(composite, materializeComposite(composite, currentAttributes)) - } - } - - val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) - - materializedValueSources.foreach { module => - val matAttribute = new MaterializedValueSourceAttribute(mat) - val copied = copyAtomicModule(module, parentAttributes and Attributes(matAttribute)) - assignPort(module.shape.outlet, (copied.shape.outlet, copied)) - graph.addVertex(copied) - materializedValues.put(copied, Atomic(copied)) - } - mat - } - - private def materializeComposite(composite: Module, effectiveAttributes: Attributes): MaterializedValueNode = { - materializeModule(composite, effectiveAttributes) - } - - private def mergeAttributes(parent: Attributes, current: Attributes): Attributes = { - parent and current - } - - private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, MaterializedValueNode]): MaterializedValueNode = matNode match { - case Atomic(m) => materializedValues(m) - case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) - case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues)) - case Ignore => Ignore - } - - final protected def assignPort(in: InPort, subscriber: (InPort, Module)): Unit = { - addVertex(subscriber._2) - subscribers(in) = subscriber - // Interface (unconnected) ports of the current scope will be wired when exiting the scope - if (!currentLayout.inPorts(in)) { - val out = currentLayout.upstreams(in) - val publisher = publishers(out) - if (publisher ne null) addEdge(publisher, subscriber) - } - } - - final protected def assignPort(out: OutPort, publisher: (OutPort, Module)): Unit = { - addVertex(publisher._2) - publishers(out) = publisher - // Interface (unconnected) ports of the current scope will be wired when exiting the scope - if (!currentLayout.outPorts(out)) { - val in = currentLayout.downstreams(out) - val subscriber = subscribers(in) - if (subscriber ne null) addEdge(publisher, subscriber) - } - } - - // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies - // of the same module. - // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter - private def enterScope(enclosing: CopiedModule): Unit = { - subscribersStack ::= mutable.Map.empty.withDefaultValue(null) - publishersStack ::= mutable.Map.empty.withDefaultValue(null) - moduleStack ::= enclosing.copyOf - } - - // Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning - // them to the copied ports instead of the original ones (since there might be multiple copies of the same module - // leading to port identity collisions) - // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter - private def exitScope(enclosing: CopiedModule): Unit = { - val scopeSubscribers = subscribers - val scopePublishers = publishers - subscribersStack = subscribersStack.tail - publishersStack = publishersStack.tail - moduleStack = moduleStack.tail - - // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of - // the original module and assign them to the copy ports in the outer scope that we will return to - enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { - case (original, exposed) => assignPort(exposed, scopeSubscribers(original)) - } - - enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach { - case (original, exposed) => assignPort(exposed, scopePublishers(original)) - } - } - - private def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = { - graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2) - } - - private def addVertex(module: Module): Unit = { - graph.addVertex(module) - } - } - - final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala deleted file mode 100644 index 50c4450..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump - -import akka.stream.Attributes -import akka.stream.Attributes.Attribute - -object GearAttributes { - - /** - * Define how many parallel instance we want to use to run this module - * @param count - * @return - */ - def count(count: Int): Attributes = Attributes(ParallismAttribute(count)) - - /** - * Define we want to render this module locally. - * @return - */ - def local: Attributes = Attributes(LocationAttribute(Local)) - - /** - * Define we want to render this module remotely - * @return - */ - def remote: Attributes = Attributes(LocationAttribute(Remote)) - - /** - * Get the effective location settings if child override the parent - * setttings. - * - * @param attrs - * @return - */ - def location(attrs: Attributes): Location = { - attrs.attributeList.foldLeft(Local: Location) { (s, attr) => - attr match { - case LocationAttribute(location) => location - case other => s - } - } - } - - /** - * get effective parallelism settings if child override parent. - * @param attrs - * @return - */ - def count(attrs: Attributes): Int = { - attrs.attributeList.foldLeft(1) { (s, attr) => - attr match { - case ParallismAttribute(count) => count - case other => s - } - } - } - - /** - * Where we want to render the module - */ - sealed trait Location - object Local extends Location - object Remote extends Location - - final case class LocationAttribute(tag: Location) extends Attribute - - /** - * How many parallel instance we want to use for this module. - * - * @param parallelism - */ - final case class ParallismAttribute(parallelism: Int) extends Attribute -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala deleted file mode 100644 index a11d7cb..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump - -import akka.actor.ActorSystem -import akka.stream._ -import akka.stream.gearpump.graph.GraphCutter.Strategy -import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer -import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer -import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, SubGraphMaterializer} -import akka.stream.impl.StreamLayout.Module - -/** - * - * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump - * streaming application. If some module cannot be rendered remotely in Gearpump - * Cluster, then it uses local Actor materializer as fallback to materialize - * the module locally. - * - * User can custom a [[Strategy]] to determinie which module should be rendered - * remotely, and which module should be rendered locally. - * - * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two parts, - * and materialize them seperately. - * - * @param system - * @param strategy - * @param useLocalCluster whether to use built-in in-process local cluster - */ -class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter.AllRemoteStrategy, - useLocalCluster: Boolean = true) - extends BaseMaterializer { - - private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map( - classOf[LocalGraph] -> new LocalGraphMaterializer(system), - classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system) - ) - - override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = { - val subGraphs = new GraphCutter(strategy).cut(graph) - val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, subGraph) => - val materializer = subMaterializers(subGraph.getClass) - map ++ materializer.materialize(subGraph, map) - } - graph.resolve(matValues) - } - - override def shutdown(): Unit = { - subMaterializers.values.foreach(_.shutdown()) - } -} - -object GearpumpMaterializer { - def apply(system: ActorSystem): GearpumpMaterializer = new GearpumpMaterializer(system) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala deleted file mode 100644 index 7808b52..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.graph.GraphCutter -import akka.stream.scaladsl.{Sink, Source} - -/** - * This tests how the [[GearpumpMaterializer]] materializes different partials of Graph - * to different runtime. - * - * In this test, source module and sink module are materialized locally, - * Other transformation module are materialized remotely in Gearpump - * streaming Application. - * - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - * - * - */ -object Test { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", - "blue sky")) - source.filter(_.startsWith("red")).fold("Items:") { (a, b) => - a + "|" + b - }.map("I want to order item: " + _).runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - // scalastyle:off println - println("Confirm received: " + any) - // scalastyle:on println - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala deleted file mode 100644 index 2426f5f..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.ActorMaterializer -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.{GearSink, GearSource} -import akka.stream.scaladsl.{Flow, Sink, Source} - -/** - * - * This tests how different Materializers can be used together in an explicit way. - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - * - */ -object Test2 { - - def main(args: Array[String]): Unit = { - - println("running Test2...") - implicit val system = ActorSystem("akka-test") - val materializer = new GearpumpMaterializer(system) - - val echo = system.actorOf(Props(new Echo())) - val source = GearSource.bridge[String, String] - val sink = GearSink.bridge[String, String] - - val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _) - val (entry, exit) = flow.runWith(source, sink)(materializer) - - val actorMaterializer = ActorMaterializer() - - val externalSource = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - val externalSink = Sink.actorRef(echo, "COMPLETE") - - val graph = FlowGraph.closed() { implicit b => - externalSource ~> Sink(entry) - Source(exit) ~> externalSink - } - graph.run()(actorMaterializer) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala deleted file mode 100644 index 976b1e6..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.GearSource -import akka.stream.scaladsl.Sink - -import org.apache.gearpump.streaming.dsl.CollectionDataSource - -/** - * read from remote and write to local - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - */ -object Test3 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - val sourceData = new CollectionDataSource(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - val source = GearSource.from[String](sourceData) - source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala deleted file mode 100644 index 7b80b7b..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.GearSink -import akka.stream.scaladsl.Source - -import org.apache.gearpump.streaming.dsl.LoggerSink - -/** - * read from local and write to remote - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - */ -object Test4 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system) - - val sink = GearSink.to(new LoggerSink[String]) - val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala deleted file mode 100644 index 052c018..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.graph.GraphCutter -import akka.stream.scaladsl.{Sink, Source, Unzip} - -/** -test fanout - */ -object Test5 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - - val source = Source(List(("male", "24"), ("female", "23"))) - - val graph = FlowGraph.closed() { implicit b => - val unzip = b.add(Unzip[String, String]()) - - val sink1 = Sink.actorRef(echo, "COMPLETE") - val sink2 = Sink.actorRef(echo, "COMPLETE") - - source ~> unzip.in - unzip.out0 ~> sink1 - unzip.out1 ~> sink1 - } - - graph.run() - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala deleted file mode 100644 index 0fccd30..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.GearSource -import akka.stream.scaladsl.Sink - -import org.apache.gearpump.streaming.dsl.CollectionDataSource - -/** - * WordCount example - * Test GroupBy - */ - -import akka.stream.gearpump.scaladsl.Implicits._ - -object Test6 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - val sourceData = new CollectionDataSource(List("this is a good start", "this is a good time", "time to start", "congratulations", "green plant", "blue sky")) - val source = GearSource.from[String](sourceData) - source.mapConcat { line => - line.split(" ").toList - }.groupBy2(x => x).map(word => (word, 1)) - .reduce({ (a, b) => - (a._1, a._2 + b._2) - }).log("word-count").runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala deleted file mode 100644 index 56b89bc..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import java.io.{File, FileInputStream} -import java.util.zip.GZIPInputStream -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.util.{Failure, Success, Try} - -import akka.actor.ActorSystem -import akka.stream.gearpump.graph.GraphCutter -import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer} -import akka.stream.scaladsl._ -import akka.util.ByteString -import org.json4s.JsonAST.JString - -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.util.AkkaApp - -/** - * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ - * which showcases running Akka Streams DSL across JVMs on Gearpump - * - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar - * -input wikidata-${DATE}-all.json.gz -languages en,de - * - * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/) - * - */ -object WikipediaApp extends ArgumentsParser with AkkaApp { - - case class WikidataElement(id: String, sites: Map[String, String]) - - override val options: Array[(String, CLIOption[Any])] = Array( - "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true), - "languages" -> CLIOption[String]("<languages to take into account>", required = true) - ) - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val parsed = parse(args) - val input = new File(parsed.getString("input")) - val langs = parsed.getString("languages").split(",") - - implicit val system = ActorSystem("wikidata-poc", akkaConf) - implicit val materializer = new GearpumpMaterializer(system, GraphCutter.TagAttributeStrategy, useLocalCluster = false) - import system.dispatcher - - val elements = source(input).via(parseJson(langs)) - - val g = FlowGraph.closed(count) { implicit b => - sinkCount => { - - val broadcast = b.add(Broadcast[WikidataElement](2)) - elements ~> broadcast ~> logEveryNSink(1000) - broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount - } - } - - g.run().onComplete { x => - x match { - case Success((t, f)) => printResults(t, f) - case Failure(tr) => println("Something went wrong") - } - system.terminate() - } - Await.result(system.whenTerminated, Duration.Inf) - } - - def source(file: File): Source[String, Future[Long]] = { - val compressed = new GZIPInputStream(new FileInputStream(file), 65536) - InputStreamSource(() => compressed) - .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) - .map(x => x.decodeString("utf-8")) - } - - def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] = - Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect { - case Some(v) => v - } - - def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = { - import org.json4s.jackson.JsonMethods - Try(JsonMethods.parse(line)).toOption.flatMap { json => - json \ "id" match { - case JString(itemId) => - val sites = for { - lang <- langs - JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title" - } yield lang -> title - - if (sites.isEmpty) None - else Some(WikidataElement(id = itemId, sites = sites.toMap)) - - case _ => None - } - } - } - - def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) => - if (x % n == 0) - println(s"Processing element $x: $y") - x + 1 - } - - def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, Unit] = Flow[WikidataElement] - .filter(_.sites.keySet == langs) - .map { x => - val titles = x.sites.values - titles.forall(_ == titles.head) - }.withAttributes(GearAttributes.remote) - - def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) { - case ((t, f), true) => (t + 1, f) - case ((t, f), false) => (t, f + 1) - } - - def printResults(t: Int, f: Int) = { - val message = - s""" - | Number of items with the same title: $t - | Number of items with the different title: $f - | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} - """.stripMargin - println(message) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala deleted file mode 100644 index 19083f6..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.stream.ModuleGraph -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.GearAttributes -import akka.stream.gearpump.GearAttributes.{Local, Location, Remote} -import akka.stream.gearpump.graph.GraphCutter.Strategy -import akka.stream.gearpump.module.{BridgeModule, DummyModule, GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule} -import akka.stream.impl.Stages.DirectProcessor -import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module} -import akka.stream.impl.{SinkModule, SourceModule} - -import org.apache.gearpump.util.Graph - -/** - * - * GraphCutter is used to decide which part is rendered locally - * and which part should be rendered remotely. - * - * We will cut the graph based on the [[Strategy]] provided. - * - * For example, for the following graph, we can cut the graph to - * two parts, each part will be a Sub Graph. The top SubGraph - * can be materialized remotely. The bottom part can be materialized - * locally. - * - * AtomicModule2 -> AtomicModule4 - * /| \ - * / \ - * -----------cut line -------------cut line ---------- - * / \ - * / \| - * AtomicModule1 AtomicModule5 - * \ /| - * \ / - * \| / - * AtomicModule3 - * - * - * @see [[ModuleGraph]] for more information of how Graph is organized. - * - */ -class GraphCutter(strategy: Strategy) { - def cut(moduleGraph: ModuleGraph[_]): List[SubGraph] = { - val graph = removeDummyModule(moduleGraph.graph) - val tags = tag(graph, strategy) - doCut(graph, tags, moduleGraph.mat) - } - - private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location], - mat: MaterializedValueNode): List[SubGraph] = { - val local = Graph.empty[Module, Edge] - val remote = Graph.empty[Module, Edge] - - graph.vertices.foreach { module => - if (tags(module) == Local) { - local.addVertex(module) - } else { - remote.addVertex(module) - } - } - - graph.edges.foreach { nodeEdgeNode => - val (node1, edge, node2) = nodeEdgeNode - (tags(node1), tags(node2)) match { - case (Local, Local) => - local.addEdge(nodeEdgeNode) - case (Remote, Remote) => - remote.addEdge(nodeEdgeNode) - case (Local, Remote) => - node2 match { - case bridge: BridgeModule[_, _, _] => - local.addEdge(node1, edge, node2) - case _ => - // Creates a bridge module in between - val bridge = new SourceBridgeModule[AnyRef, AnyRef]() - val remoteEdge = Edge(bridge.outPort, edge.to) - remote.addEdge(bridge, remoteEdge, node2) - val localEdge = Edge(edge.from, bridge.inPort) - local.addEdge(node1, localEdge, bridge) - } - case (Remote, Local) => - node1 match { - case bridge: BridgeModule[_, _, _] => - local.addEdge(node1, edge, node2) - case _ => - // Creates a bridge module in between - val bridge = new SinkBridgeModule[AnyRef, AnyRef]() - val remoteEdge = Edge(edge.from, bridge.inPort) - remote.addEdge(node1, remoteEdge, bridge) - val localEdge = Edge(bridge.outPort, edge.to) - local.addEdge(bridge, localEdge, node2) - } - } - } - - List(new RemoteGraph(remote), new LocalGraph(local)) - } - - private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = { - graph.vertices.map { vertex => - vertex -> strategy.apply(vertex) - }.toMap - } - - private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = { - val graph = inputGraph.copy - val dummies = graph.vertices.filter { module => - module match { - case dummy: DummyModule => - true - case _ => - false - } - } - dummies.foreach(module => graph.removeVertex(module)) - graph - } -} - -object GraphCutter { - - type Strategy = PartialFunction[Module, Location] - - val BaseStrategy: Strategy = { - case source: BridgeModule[_, _, _] => - Remote - case task: GearpumpTaskModule => - Remote - case groupBy: GroupByModule[_, _] => - // TODO: groupBy is not supported by local materializer yet - Remote - case source: SourceModule[_, _] => - Local - case sink: SinkModule[_, _] => - Local - case matValueSource: MaterializedValueSource[_] => - Local - case direct: DirectProcessor => - Local - case time: TimerTransform => - // Renders to local as it requires a timer. - Local - } - - val AllRemoteStrategy: Strategy = BaseStrategy orElse { - case _ => - Remote - } - - /** - * Will decide whether to render a module locally or remotely - * based on Attribute settings. - * - */ - val TagAttributeStrategy: Strategy = BaseStrategy orElse { - case module => - GearAttributes.location(module.attributes) - } - - val AllLocalStrategy: Strategy = BaseStrategy orElse { - case _ => - Local - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala deleted file mode 100644 index 6ef8598..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.actor.ActorSystem -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.materializer.LocalMaterializer -import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule} -import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{PublisherSource, SubscriberSink} -import akka.stream.{Outlet, SinkShape, SourceShape} -import org.reactivestreams.{Publisher, Subscriber} - -import org.apache.gearpump.util.Graph - -/** - * - * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only - * contain module that can be materialized in local JVM. - * - * @param graph - */ -class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph - -object LocalGraph { - - /** - * materialize LocalGraph in local JVM - * @param system - */ - class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer { - - // Creates a local materializer - val materializer = LocalMaterializer()(system) - - /** - * - * @param matValues Materialized Values for each module before materialization - * @return Materialized Values for each Module after the materialization. - */ - override def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] = { - val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module => - module match { - case source: SourceBridgeModule[AnyRef, AnyRef] => - val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]] - val shape = SinkShape(source.inPort) - new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape) - case sink: SinkBridgeModule[AnyRef, AnyRef] => - val publisher = matValues(sink).asInstanceOf[Publisher[AnyRef]] - val shape = SourceShape(sink.outPort.asInstanceOf[Outlet[AnyRef]]) - new PublisherSource(publisher, DefaultAttributes.publisherSource, shape) - case other => - other - } - } - materializer.materialize(newGraph, matValues) - } - - override def shutdown(): Unit = { - materializer.shutdown() - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala deleted file mode 100644 index 3cea78a..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.actor.ActorSystem -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.materializer.RemoteMaterializerImpl -import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule} -import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient -import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient -import akka.stream.impl.StreamLayout.Module - -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster -import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.util.Graph - -/** - * - * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only - * contain modules that can be materialized in remote Gearpump cluster. - * - * @param graph - */ -class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph - -object RemoteGraph { - - /** - * * materialize LocalGraph in remote gearpump cluster - * @param useInProcessCluster - * @param system - */ - class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { - private val local = if (useInProcessCluster) { - val cluster = EmbeddedCluster() - cluster.start() - Some(cluster) - } else { - None - } - - private val context: ClientContext = local match { - case Some(local) => local.newClientContext - case None => ClientContext(system) - } - - override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, Any]): Map[Module, Any] = { - val graph = subGraph.graph - - if (graph.isEmpty) { - inputMatValues - } else { - doMaterialize(graph: Graph[Module, Edge], inputMatValues) - } - } - - private def doMaterialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = { - val materializer = new RemoteMaterializerImpl(graph, system) - val (app, matValues) = materializer.materialize - - val appId = context.submit(app) - println("sleep 5 second until the applicaiton is ready on cluster") - Thread.sleep(5000) - - def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = { - matValues.toList.flatMap { kv => - val (module, processorId) = kv - module match { - case source: SourceBridgeModule[AnyRef, AnyRef] => - val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher, context, appId, processorId) - Some((module, bridge)) - case sink: SinkBridgeModule[AnyRef, AnyRef] => - val bridge = new SinkBridgeTaskClient(system, context, appId, processorId) - Some((module, bridge)) - case other => - None - } - }.toMap - } - - inputMatValues ++ resolve(matValues) - } - - override def shutdown(): Unit = { - context.close() - local.map(_.stop()) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala deleted file mode 100644 index 564b6c7..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.stream.ModuleGraph.Edge -import akka.stream.impl.StreamLayout.Module - -import org.apache.gearpump.util.Graph - -/** - * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]] - * - * The idea is that by dividing [[akka.stream.ModuleGraph]] to several - * [[SubGraph]], we can materialize each [[SubGraph]] with different - * materializer. - */ - -trait SubGraph { - - /** - * the [[Graph]] representation of this SubGraph - * @return - */ - def graph: Graph[Module, Edge] -} - -/** - * Materializer for Sub-Graph type - */ -trait SubGraphMaterializer { - /** - * - * @param matValues Materialized Values for each module before materialization - * @return Materialized Values for each Module after the materialization. - */ - - def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] - - def shutdown(): Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala deleted file mode 100644 index a5c6e48..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.materializer - -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import scala.concurrent.{Await, ExecutionContextExecutor} - -import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, PoisonPill, Props, RepointableActorRef} -import akka.dispatch.Dispatchers -import akka.pattern.ask -import akka.stream.ModuleGraph.Edge -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.StreamSupervisor -import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph} - -import org.apache.gearpump.util.Graph - -/** - * [[LocalMaterializer]] will use local actor to materialize the graph - * Use LocalMaterializer.apply to construct the LocalMaterializer. - * - * It is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] - * - * - * @param system - * @param settings - * @param dispatchers - * @param supervisor - * @param haveShutDown - * @param flowNameCounter - * @param namePrefix - * @param optimizations - */ -abstract class LocalMaterializer( - val system: ActorSystem, - override val settings: ActorMaterializerSettings, - dispatchers: Dispatchers, - val supervisor: ActorRef, - val haveShutDown: AtomicBoolean, - flowNameCounter: AtomicLong, - namePrefix: String, - optimizations: Optimizations) extends ActorMaterializer { - - override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { - import ActorAttributes._ - import Attributes._ - opAttr.attributeList.foldLeft(settings) { (s, attr) => - attr match { - case InputBuffer(initial, max) => s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) - case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) - case l: LogLevels => s - case Name(_) => s - case other => s - } - } - } - - override def shutdown(): Unit = - if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill - - override def isShutdown: Boolean = haveShutDown.get() - - private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = { - supervisor match { - case ref: LocalActorRef => - ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) - case ref: RepointableActorRef => - if (ref.isStarted) { - ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), - name, systemService = false) - } else { - implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), - name)).mapTo[ActorRef] - Await.result(f, timeout.duration) - } - case unknown => - throw new IllegalStateException( - s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } - } - - override lazy val executionContext: ExecutionContextExecutor = - dispatchers.lookup(settings.dispatcher match { - case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId - case other => other - }) - - def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] - - override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = { - val graph = ModuleGraph(runnableGraph) - val matValues = materialize(graph.graph, Map.empty[Module, Any]) - graph.resolve(matValues) - } - - override def actorOf(context: MaterializationContext, props: Props): ActorRef = { - val dispatcher = - if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) { - effectiveSettings(context.effectiveAttributes).dispatcher - } else { - props.dispatcher - } - actorOf(props, context.stageName, dispatcher) - } -} - -object LocalMaterializer { - - def apply(materializerSettings: Option[ActorMaterializerSettings] = None, - namePrefix: Option[String] = None, - optimizations: Optimizations = Optimizations.none)(implicit system: ActorSystem) - : LocalMaterializerImpl = { - - val settings = materializerSettings getOrElse ActorMaterializerSettings(system) - apply(settings, namePrefix.getOrElse("flow"), optimizations)(system) - } - - def apply(materializerSettings: ActorMaterializerSettings, - namePrefix: String, optimizations: Optimizations)(implicit system: ActorSystem) - : LocalMaterializerImpl = { - val haveShutDown = new AtomicBoolean(false) - - new LocalMaterializerImpl( - system, - materializerSettings, - system.dispatchers, - system.actorOf(StreamSupervisor.props(materializerSettings, - haveShutDown).withDispatcher(materializerSettings.dispatcher)), - haveShutDown, - FlowNameCounter(system).counter, - namePrefix, - optimizations) - } -}
