http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
deleted file mode 100644
index 47ed1f2..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, 
ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, 
SourceTaskModule}
-import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, 
SinkBridgeTask, SourceBridgeTask, UnZip2Task}
-import akka.stream.impl.Stages
-import akka.stream.impl.Stages.StageModule
-import akka.stream.impl.StreamLayout.Module
-import org.slf4j.LoggerFactory
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, 
FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, 
SlaveOp}
-import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-/**
- * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a 
Gearpump
- * Streaming Application.
- *
- * @param graph
- * @param system
- */
-class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
-
-  import RemoteMaterializerImpl._
-
-  type Clue = String
-  private implicit val actorSystem = system
-
-  private def uuid: String = {
-    java.util.UUID.randomUUID.toString
-  }
-
-  /**
-   * @return a mapping from Module to Materialized Processor Id.
-   */
-  def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
-    val (opGraph, clues) = toOpGraph()
-    val app: StreamApplication = new StreamApp("app", system, 
UserConfig.empty, opGraph)
-    val processorIds = resolveClues(app, clues)
-
-    val updatedApp = updateJunctionConfig(processorIds, app)
-    (cleanClues(updatedApp), processorIds)
-  }
-
-  private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], 
app: StreamApplication): StreamApplication = {
-    val config = junctionConfig(processorIds)
-
-    val dag = app.dag.mapVertex { vertex =>
-      val processorId = vertex.id
-      val newConf = vertex.taskConf.withConfig(config(processorId))
-      vertex.copy(taskConf = newConf)
-    }
-    new StreamApplication(app.name, app.inputUserConfig, dag)
-  }
-
-  /**
-   * Update junction config so that each GraphTask know its upstream and 
downstream.
-   * @param processorIds
-   * @return
-   */
-  private def junctionConfig(processorIds: Map[Module, ProcessorId]): 
Map[ProcessorId, UserConfig] = {
-    val updatedConfigs = graph.vertices.map { vertex =>
-      val processorId = processorIds(vertex)
-      vertex match {
-        case junction: JunctionModule =>
-          val inProcessors = junction.shape.inlets.map { inlet =>
-            val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to 
== inlet).map(_._1)
-            val upstreamProcessorId = processorIds(upstreamModule.get)
-            upstreamProcessorId
-          }.toList
-
-          val outProcessors = junction.shape.outlets.map { outlet =>
-            val downstreamModule = 
graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3)
-            val downstreamProcessorId = downstreamModule.map(processorIds(_))
-            downstreamProcessorId.get
-          }.toList
-
-          (processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, 
outProcessors)
-            .withValue(GraphTask.IN_PROCESSORS, inProcessors))
-        case _ =>
-          (processorId, UserConfig.empty)
-      }
-    }.toMap
-    updatedConfigs
-  }
-
-  private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): 
Map[Module, ProcessorId] = {
-    clues.flatMap { kv =>
-      val (module, clue) = kv
-      val processorId = app.dag.vertices.find { processor =>
-        processor.taskConf.getString(clue).isDefined
-      }.map(_.id)
-      processorId.map((module, _))
-    }
-  }
-
-  private def cleanClues(app: StreamApplication): StreamApplication = {
-    val graph = app.dag.mapVertex { processor =>
-      val conf = cleanClue(processor.taskConf)
-      processor.copy(taskConf = conf)
-    }
-    new StreamApplication(app.name, app.inputUserConfig, graph)
-  }
-
-  private def cleanClue(conf: UserConfig): UserConfig = {
-    conf.filter { kv =>
-      kv._2 != RemoteMaterializerImpl.STAINS
-    }
-  }
-
-  private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = {
-    var matValues = Map.empty[Module, Clue]
-    val opGraph = graph.mapVertex{ module =>
-      val name = uuid
-      val conf = UserConfig.empty.withString(name, 
RemoteMaterializerImpl.STAINS)
-      matValues += module -> name
-      val parallelism = GearAttributes.count(module.attributes)
-      val op = module match {
-        case source: SourceTaskModule[t] =>
-          val updatedConf = conf.withConfig(source.conf)
-          new DataSourceOp[t](source.source, parallelism, updatedConf, 
"source")
-        case sink: SinkTaskModule[t] =>
-          val updatedConf = conf.withConfig(sink.conf)
-          new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink")
-        case sourceBridge: SourceBridgeModule[_, _] =>
-          new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, 
"source")
-        case processor: ProcessorModule[_, _, _] =>
-          val updatedConf = conf.withConfig(processor.conf)
-          new ProcessorOp(processor.processor, parallelism, updatedConf, 
"source")
-        case sinkBridge: SinkBridgeModule[_, _] =>
-          new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
-        case groupBy: GroupByModule[t, g] =>
-          new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf)
-        case reduce: ReduceModule[Any] =>
-          reduceOp(reduce.f, conf)
-        case stage: StageModule =>
-          translateStage(stage, conf)
-        case fanIn: FanInModule =>
-          translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, 
conf)
-        case fanOut: FanOutModule =>
-          translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, 
conf)
-      }
-
-      if (op == null) {
-        throw new UnsupportedOperationException(module.getClass.toString + " 
is not supported with RemoteMaterializer")
-      }
-      op
-    }.mapEdge[OpEdge]{(n1, edge, n2) =>
-      n2 match {
-        case master: MasterOp =>
-          Shuffle
-        case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
-          Shuffle
-        case slave: SlaveOp[_] =>
-          Direct
-      }
-    }
-    (opGraph, matValues)
-  }
-
-  private def translateStage(module: StageModule, conf: UserConfig): Op = {
-    module match {
-      case buffer: Stages.Buffer =>
-        //ignore the buffering operation
-        identity("buffer", conf)
-      case collect: Stages.Collect =>
-        collectOp(collect.pf, conf)
-      case concatAll: Stages.ConcatAll =>
-        //TODO:
-        null
-      case conflat: Stages.Conflate =>
-        conflatOp(conflat.seed, conflat.aggregate, conf)
-      case drop: Stages.Drop =>
-        dropOp(drop.n, conf)
-      case dropWhile: Stages.DropWhile =>
-        dropWhileOp(dropWhile.p, conf)
-      case expand: Stages.Expand =>
-        //TODO
-        null
-      case filter: Stages.Filter =>
-        filterOp(filter.p, conf)
-      case fold: Stages.Fold =>
-        foldOp(fold.zero, fold.f, conf)
-      case groupBy: Stages.GroupBy =>
-        //TODO
-        null
-      case grouped: Stages.Grouped =>
-        groupedOp(grouped.n, conf)
-      case _: Stages.Identity =>
-        identity("identity", conf)
-      case log: Stages.Log =>
-        logOp(log.name, log.extract, conf)
-      case map: Stages.Map =>
-        mapOp(map.f, conf)
-      case mapAsync: Stages.MapAsync =>
-        //TODO
-        null
-      case mapAsync: Stages.MapAsyncUnordered =>
-        //TODO
-        null
-      case flatMap: Stages.MapConcat =>
-        flatMapOp(flatMap.f, "mapConcat", conf)
-      case stage: MaterializingStageFactory =>
-        //TODO
-        null
-      case prefixAndTail: Stages.PrefixAndTail =>
-        //TODO
-        null
-      case recover: Stages.Recover =>
-        //TODO: we will just ignore this
-        identity("recover", conf)
-      case scan: Stages.Scan =>
-        scanOp(scan.zero, scan.f, conf)
-      case split: Stages.Split =>
-        //TODO
-        null
-      case stage: Stages.StageFactory =>
-        //TODO
-        null
-      case take: Stages.Take =>
-        takeOp(take.n, conf)
-      case takeWhile: Stages.TakeWhile =>
-        filterOp(takeWhile.p, conf)
-      case time: Stages.TimerTransform =>
-        //TODO
-        null
-    }
-  }
-
-  private def translateFanIn(
-    fanIn: FanInModule,
-    edges: List[(Module, Edge, Module)],
-    parallelism: Int,
-    conf: UserConfig): Op = {
-    fanIn match {
-      case merge: MergeModule[_] =>
-        MergeOp("merge", conf)
-      case mergePrefered: MergePreferredModule[_] =>
-        //TODO, support "prefer" merge
-        MergeOp("mergePrefered", conf)
-      case zip: ZipWithModule =>
-        //TODO: support zip module
-        null
-      case concat: ConcatModule[_] =>
-        //TODO: support concat module
-        null
-      case flexiMerge: FlexiMergeModule[_, _] =>
-        //TODO: Suport flexi merge module
-        null
-    }
-  }
-
-  private def translateFanOut(
-    fanOut: FanOutModule,
-    edges: List[(Module, Edge, Module)],
-    parallelism: Int,
-    conf: UserConfig): Op = {
-    fanOut match {
-      case unzip2: UnzipWith2Module[Any, Any, Any] =>
-        val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new 
UnZip2Task.UnZipFunction(unzip2.f))
-        new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip")
-      case broadcast: BroadcastModule[_] =>
-        new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast")
-      case broadcast: BalanceModule[_] =>
-        new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
-      case flexi: FlexiRouteImpl[_, _] =>
-        //TODO
-        null
-    }
-  }
-}
-
-object RemoteMaterializerImpl {
-  final val NotApplied: Any => Any = _ => NotApplied
-
-  def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      collect.applyOrElse(data, NotApplied) match {
-        case NotApplied => None
-        case result: Any => Option(result)
-      }
-    }, "collect", conf)
-  }
-
-  def filterOp(filter: Any => Boolean, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      if (filter(data)) Option(data) else None
-    }, "filter", conf)
-  }
-
-  def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = {
-    var result: Any = null
-    val flatMap = { elem: Any =>
-      if (result == null) {
-        result = elem
-      } else {
-        result = reduce(result, elem)
-      }
-      List(result)
-    }
-    flatMapOp(flatMap, "reduce", conf)
-  }
-
-  def identity(description: String, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      List(data)
-    }, description, conf)
-  }
-
-  def mapOp(map: Any => Any, conf: UserConfig): Op = {
-    flatMapOp({ data: Any =>
-      List(map(data))
-    }, "map", conf)
-  }
-
-  def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = {
-    flatMapOp(flatMap, "flatmap", conf)
-  }
-
-  def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: 
UserConfig): Op = {
-    FlatMapOp(fun, description, conf)
-  }
-
-  def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: 
UserConfig): Op = {
-    var agg: Any = null
-    val flatMap = { elem: Any =>
-      agg = if (agg == null) {
-        seed(elem)
-      } else {
-        aggregate(agg, elem)
-      }
-      List(agg)
-    }
-
-    flatMapOp(flatMap, "map", conf)
-  }
-
-  def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
-    var aggregator: Any = zero
-    val map = { elem: Any =>
-      aggregator = fold(aggregator, elem)
-      List(aggregator)
-    }
-    flatMapOp(map, "fold", conf)
-  }
-
-  def groupedOp(count: Int, conf: UserConfig): Op = {
-    var left = count
-    val buf = {
-      val b = Vector.newBuilder[Any]
-      b.sizeHint(count)
-      b
-    }
-
-    val flatMap: Any => Iterable[Any] = { input: Any =>
-      buf += input
-      left -= 1
-      if (left == 0) {
-        val emit = buf.result()
-        buf.clear()
-        left = count
-        Some(emit)
-      } else {
-        None
-      }
-    }
-    flatMapOp(flatMap, conf: UserConfig)
-  }
-
-  def dropOp(number: Long, conf: UserConfig): Op = {
-    var left = number
-    val flatMap: Any => Iterable[Any] = { input: Any =>
-      if (left > 0) {
-        left -= 1
-        None
-      } else {
-        Some(input)
-      }
-    }
-    flatMapOp(flatMap, "drop", conf)
-  }
-
-  def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      if (drop(data)) None else Option(data)
-    }, "dropWhile", conf)
-  }
-
-  def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
-    val flatMap = { elem: Any =>
-      LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
-      List(elem)
-    }
-    flatMapOp(flatMap, "log", conf)
-  }
-
-  def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = {
-    var aggregator = zero
-    var pushedZero = false
-
-    val flatMap = { elem: Any =>
-      aggregator = f(aggregator, elem)
-
-      if (pushedZero) {
-        pushedZero = true
-        List(zero, aggregator)
-      } else {
-        List(aggregator)
-      }
-    }
-    flatMapOp(flatMap, "scan", conf)
-  }
-
-  def takeOp(count: Long, conf: UserConfig): Op = {
-    var left: Long = count
-
-    val filter: Any => Iterable[Any] = { elem: Any =>
-      left -= 1
-      if (left > 0) Some(elem)
-      else if (left == 0) Some(elem)
-      else None
-    }
-    flatMapOp(filter, "take", conf)
-  }
-
-  /**
-   * We use stains to track how module maps to Processor
-   *
-   */
-  val STAINS = "track how module is fused to processor"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
deleted file mode 100644
index c5dfc9a..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- *
- *
- * [[IN]] -> [[BridgeModule]] -> [[OUT]]
- * /
- * /
- * out of band data input or output channel [[MAT]]
- *
- *
- * [[BridgeModule]] is used as a bridge between different materializers.
- * Different [[akka.stream.Materializer]]s can use out of band channel to
- * exchange messages.
- *
- * For example:
- *
- * Remote Materializer
- * -----------------------------
- * |                            |
- * | BridgeModule -> RemoteSink |
- * |  /                         |
- * --/----------------------------
- * Local Materializer     /  out of band channel.
- * ----------------------/----
- * | Local              /    |
- * | Source ->  BridgeModule |
- * |                         |
- * ---------------------------
- *
- *
- * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
- * module during materialization.
- *
- * However, user can still declare it explicitly. In this case, it means we 
have a
- * boundary Source or Sink module which accept out of band channel inputs or
- * outputs.
- *
- *
- * @tparam IN
- * @tparam OUT
- * @tparam MAT
- */
-abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] {
-  def attributes: Attributes
-  def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
-
-  protected def newInstance: BridgeModule[IN, OUT, MAT]
-  override def carbonCopy: Module = newInstance
-}
-
-/**
- *
- * Bridge module which accept out of band channel Input
- * [[org.reactivestreams.Publisher]][IN].
- *
- *
- * [[SourceBridgeModule]] -> [[OUT]]
- * /|
- * /
- * out of band data input [[org.reactivestreams.Publisher]][IN]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from out of band 
[[org.reactivestreams.Publisher]]
- * @tparam OUT out put data type to next module.
- */
-class SourceBridgeModule[IN, OUT](val attributes: Attributes = 
Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, 
Subscriber[IN]] {
-  override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = 
new SourceBridgeModule[IN, OUT](attributes)
-
-  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Subscriber[IN]] = {
-    new SourceBridgeModule(attributes)
-  }
-}
-
-/**
- *
- * Bridge module which accept out of band channel Output
- * [[org.reactivestreams.Subscriber]][OUT].
- *
- *
- * [[IN]] -> [[BridgeModule]]
- * \
- * \
- * \|
- * out of band data output [[org.reactivestreams.Subscriber]][OUT]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from previous module
- * @tparam OUT out put data type to out of band subscriber
- */
-class SinkBridgeModule[IN, OUT](val attributes: Attributes = 
Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, 
Publisher[OUT]] {
-  override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = 
new SinkBridgeModule[IN, OUT](attributes)
-
-  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Publisher[OUT]] = {
-    new SinkBridgeModule[IN, OUT](attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
deleted file mode 100644
index bc744f9..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{SinkModule, SourceModule}
-import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- * [[DummyModule]] is a set of special module to help construct a 
RunnableGraph,
- * so that all ports are closed.
- *
- * In runtime, [[DummyModule]] should be ignored during materialization.
- *
- * For example, if you have a [[BridgeModule]] which only accept the input
- * message from out of band channel, then you can use DummySource to fake
- * a Message Source Like this.
- *
- * [[DummySource]] -> [[BridgeModule]] -> Sink
- *                      /|
- *                     /
- *       out of band input message [[Publisher]]
- *
- *  After materialization, [[DummySource]] will be removed.
-
- *              [[BridgeModule]] -> Sink
- *                      /|
- *                     /
- *           [[akka.stream.impl.PublisherSource]]
- *
- *
- */
-trait DummyModule extends Module
-
-/**
- *
- *    [[DummySource]]-> [[BridgeModule]] -> Sink
- *                        /|
- *                       /
- *       out of band input message Source
- *
- * @param attributes
- * @param shape
- * @tparam Out
- */
-class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
-  extends SourceModule[Out, Unit](shape) with DummyModule {
-
-  override def create(context: MaterializationContext): (Publisher[Out], Unit) 
= {
-    throw new UnsupportedOperationException()
-  }
-
-  override protected def newInstance(shape: SourceShape[Out]): 
SourceModule[Out, Unit] = {
-    new DummySource[Out](attributes, shape)
-  }
-
-  override def withAttributes(attr: Attributes): Module = {
-    new DummySource(attr, amendShape(attr))
-  }
-}
-
-/**
- *
- *    Source-> [[BridgeModule]] -> [[DummySink]]
- *                    \
- *                     \
- *                      \|
- *                   out of band output message [[Subscriber]]
- *
- * @param attributes
- * @param shape
- */
-class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
-  extends SinkModule[IN, Unit](shape) with DummyModule {
-  override def create(context: MaterializationContext): (Subscriber[IN], Unit) 
= {
-    throw new UnsupportedOperationException()
-  }
-
-  override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, 
Unit] = {
-    new DummySink[IN](attributes, shape)
-  }
-
-  override def withAttributes(attr: Attributes): Module = {
-    new DummySink[IN](attr, amendShape(attr))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
deleted file mode 100644
index c4c78cc..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * [[GearpumpTaskModule]] represent modules that can be materialized as 
Gearpump Tasks.
- *
- * This is specially designed for Gearpump runtime. It is not supposed to be 
used
- * for local materializer.
- *
- */
-trait GearpumpTaskModule extends Module
-
-/**
- * This is used to represent the Gearpump Data Source
- * @param source
- * @param conf
- * @param shape
- * @param attributes
- * @tparam T
- */
-final case class SourceTaskModule[T](
-    source: DataSource,
-    conf: UserConfig,
-    shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
-    attributes: Attributes = Attributes.name("SourceTaskModule"))
-  extends GearpumpTaskModule {
-
-  override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = {
-    this.copy(shape = amendShape(attr), attributes = attr)
-  }
-  override def carbonCopy: Module = {
-    this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out")))
-  }
-
-  override def replaceShape(s: Shape): Module =
-    if (s == shape) this
-    else throw new UnsupportedOperationException("cannot replace the shape of 
SourceTaskModule")
-
-  private def amendShape(attr: Attributes): SourceShape[T] = {
-    val thisN = attributes.nameOrDefault(null)
-    val thatN = attr.nameOrDefault(null)
-
-    if ((thatN eq null) || thisN == thatN) shape
-    else shape.copy(outlet = Outlet(thatN + ".out"))
-  }
-}
-
-/**
- * This is used to represent the Gearpump Data Sink
- * @param sink
- * @param conf
- * @param shape
- * @param attributes
- * @tparam IN
- */
-final case class SinkTaskModule[IN](
-    sink: DataSink,
-    conf: UserConfig,
-    shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")),
-    attributes: Attributes = Attributes.name("SinkTaskModule"))
-  extends GearpumpTaskModule {
-
-  override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = {
-    this.copy(shape = amendShape(attr), attributes = attr)
-  }
-  override def carbonCopy: Module = this.copy(shape = 
SinkShape(Inlet[IN]("SinkTaskModule.in")))
-
-  override def replaceShape(s: Shape): Module =
-    if (s == shape) this
-    else throw new UnsupportedOperationException("cannot replace the shape of 
SinkTaskModule")
-
-  private def amendShape(attr: Attributes): SinkShape[IN] = {
-    val thisN = attributes.nameOrDefault(null)
-    val thatN = attr.nameOrDefault(null)
-
-    if ((thatN eq null) || thisN == thatN) shape
-    else shape.copy(inlet = Inlet(thatN + ".out"))
-  }
-}
-
-/**
- * This is to represent the Gearpump Processor which has exact one input and 
one output
- * @param processor
- * @param conf
- * @param attributes
- * @tparam IN
- * @tparam OUT
- * @tparam Unit
- */
-case class ProcessorModule[IN, OUT, Unit](
-    processor: Class[_ <: Task],
-    conf: UserConfig,
-    val attributes: Attributes = Attributes.name("processorModule"))
-  extends FlowModule[IN, OUT, Unit] with GearpumpTaskModule {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: ProcessorModule[IN, OUT, Unit] = {
-    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
-  }
-
-  override def withAttributes(attributes: Attributes): ProcessorModule[IN, 
OUT, Unit] = {
-    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
deleted file mode 100644
index e57a6f6..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Group the T value groupBy function
- *
- * @param f
- * @param attributes
- * @tparam T
- * @tparam Group
- */
-case class GroupByModule[T, Group](val groupBy: T => Group,
-    val attributes: Attributes = Attributes.name("groupByModule")) extends 
FlowModule[T, T, Unit] {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: GroupByModule[T, Group] = {
-    new GroupByModule[T, Group](groupBy, attributes)
-  }
-
-  override def withAttributes(attributes: Attributes): GroupByModule[T, Group] 
= {
-    new GroupByModule[T, Group](groupBy, attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
deleted file mode 100644
index 926feb6..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Reduce Module
- *
- * @param f
- * @param attributes
- * @tparam T
- */
-case class ReduceModule[T](
-    val f: (T, T) => T, val attributes: Attributes = 
Attributes.name("reduceModule"))
-  extends FlowModule[T, T, Unit] {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, 
attributes)
-
-  override def withAttributes(attributes: Attributes): ReduceModule[T] = {
-    new ReduceModule[T](f, attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
deleted file mode 100644
index 9cc46c9..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.scaladsl
-
-import akka.stream.Attributes
-import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, 
ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, 
SourceBridgeModule, SourceTaskModule}
-import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
-import org.reactivestreams.{Publisher, Subscriber}
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-object GearSource {
-
-  /**
-   * Construct a Source which accepts out of band input messages.
-   *
-   *                   [[SourceBridgeModule]] -> Sink
-   *                          /
-   *                         /
-   *                        V
-   *                materialize to [[Subscriber]]
-   *                                   /|
-   *                                  /
-   *       upstream [[Publisher]] send out of band message
-   *
-   */
-  def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = {
-    val source = new Source(new DummySource[IN](Attributes.name("dummy"), 
Source.shape("dummy")))
-    val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, 
OUT]())
-    source.viaMat(flow)(Keep.right)
-  }
-
-  /**
-   * Construct a Source from Gearpump [[DataSource]].
-   *
-   * [[SourceTaskModule]] -> downstream Sink
-   *
-   */
-  def from[OUT](source: DataSource): Source[OUT, Unit] = {
-    val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, 
UserConfig.empty))
-    taskSource
-  }
-
-  /**
-   * Construct a Source from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
-   *
-   * [[ProcessorModule]] -> downstream Sink
-   *
-   */
-  def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, 
Unit] = {
-    val source = new Source(new DummySource[Unit](Attributes.name("dummy"), 
Source.shape("dummy")))
-    val flow = Processor.apply[Unit, OUT](processor, conf)
-    source.viaMat(flow)(Keep.right)
-  }
-}
-
-object GearSink {
-
-  /**
-   * Construct a Sink which output messages to a out of band channel.
-   *
-   *   Souce ->   [[SinkBridgeModule]]
-   *                    \
-   *                     \|
-   *         materialize to [[Publisher]]
-   *                              \
-   *                               \
-   *                                \|
-   *       send out of band message to downstream [[Subscriber]]
-   *
-   */
-  def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = {
-    val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), 
Sink.shape("dummy")))
-    val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, 
OUT]())
-    flow.to(sink)
-  }
-
-  /**
-   * Construct a Sink from Gearpump [[DataSink]].
-   *
-   * Upstream Source -> [[SinkTaskModule]]
-   *
-   */
-  def to[IN](sink: DataSink): Sink[IN, Unit] = {
-    val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, 
UserConfig.empty))
-    taskSink
-  }
-
-  /**
-   * Construct a Sink from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
-   *
-   * Upstream Source -> [[ProcessorModule]]
-   *
-   */
-  def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
-    val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), 
Sink.shape("dummy")))
-    val flow = Processor.apply[IN, Unit](processor, conf)
-    flow.to(sink)
-  }
-}
-
-/**
- *
- * GroupBy will divide the main input stream to a set of sub-streams.
- * This is a work-around to bypass the limitation of official API Flow.groupBy
- *
- *
- * For example, to do a word count, we can write code like this:
- *
- * case class KV(key: String, value: String)
- * case class Count(key: String, count: Int)
- *
- * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
- *   Count(kv.key, 1)
- * }.fold(Count(null, 0)){(base, add) =>
- *   Count(add.key, base.count + add.count)
- * }.log("count of current key")
- * .flatten()
- * .to(sink)
- *
- * map, fold will transform data on all sub-streams, If there are 10 groups,
- * then there will be 10 sub-streams, and for each sub-stream, there will be
- * a map and fold.
- *
- * flatten will collect all sub-stream into the main stream,
- *
- * sink will only operate on the main stream.
- *
- */
-object GroupBy {
-  def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
-    new Flow[T, T, Unit](new GroupByModule(groupBy))
-  }
-}
-
-/**
- * Aggregate on the data.
- *
- * val flow = Reduce({(a: Int, b: Int) => a + b})
- *
- *
- */
-object Reduce {
-  def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
-    new Flow[T, T, Unit](new ReduceModule(reduce))
-  }
-}
-
-/**
- * Create a Flow by providing a Gearpump Processor class and configuration
- *
- *
- */
-object Processor {
-  def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, 
Out, Unit] = {
-    new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, 
conf))
-  }
-}
-
-object Implicits {
-
-  /**
-   * Help util to support reduce and groupBy
-   */
-  implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
-
-    //TODO It is named as groupBy2 to avoid conflict with built-in
-    // groupBy. Eventually, we think the built-in groupBy should
-    // be replace with this implementation.
-    def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
-      val stage = GroupBy.apply(groupBy)
-      source.via[T, Unit](stage)
-    }
-
-    def reduce(reduce: (T, T) => T): Source[T, Mat] = {
-      val stage = Reduce.apply(reduce)
-      source.via[T, Unit](stage)
-    }
-
-    def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, 
Mat] = {
-      val stage = Processor.apply[T, R](processor, conf)
-      source.via(stage)
-    }
-  }
-
-  /**
-   * Help util to support reduce and groupBy
-   */
-  implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) {
-    def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = {
-      val stage = GroupBy.apply(groupBy)
-      flow.via(stage)
-    }
-
-    def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = {
-      val stage = Reduce.apply(reduce)
-      flow.via(stage)
-    }
-
-    def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, 
Mat] = {
-      val stage = Processor.apply[OUT, R](processor, conf)
-      flow.via(stage)
-    }
-  }
-
-  /**
-   * Help util to support groupByKey and sum
-   */
-  implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) {
-
-    /**
-     * if it is a KV Pair, we can group the KV pair by the key.
-     * @return
-     */
-    def groupByKey: Source[(K, V), Mat] = {
-      val stage = GroupBy.apply(getTupleKey[K, V])
-      source.via(stage)
-    }
-
-    /**
-     * Does sum on values
-     *
-     * Before doing this, you need to do groupByKey to group same key together
-     * , otherwise, it will do the sum no matter what current key is.
-     */
-    def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
-      val stage = Reduce.apply(sumByKey[K, V](numeric))
-      source.via(stage)
-    }
-  }
-
-  /**
-   * Helper util to support groupByKey and sum
-   */
-  implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
-
-    /**
-     * If it is a KV Pair, we can group the KV pair by the key.
-     *
-     */
-    def groupByKey: Flow[(K, V), (K, V), Mat] = {
-      val stage = GroupBy.apply(getTupleKey[K, V])
-      flow.via(stage)
-    }
-
-    /**
-     * do sum on values
-     *
-     * Before doing this, you need to do groupByKey to group same key together
-     * , otherwise, it will do the sum no matter what current key is.
-     *
-     */
-    def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
-      val stage = Reduce.apply(sumByKey[K, V](numeric))
-      flow.via(stage)
-    }
-  }
-
-  private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
-
-  private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, 
V]) => Tuple2[K, V] =
-    (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
deleted file mode 100644
index 2eb0612..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
-
-  val sizeOfOutputs = sizeOfOutPorts
-  var index = 0
-
-  override def onNext(msg: Message): Unit = {
-    output(index, msg)
-    index += 1
-    if (index == sizeOfOutputs) {
-      index = 0
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
deleted file mode 100644
index 925bf21..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
-  override def onNext(msg: Message): Unit = {
-    context.output(msg)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
deleted file mode 100644
index 9a4e24e..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import akka.stream.gearpump.task.GraphTask.{Index, PortId}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskWrapper}
-
-class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
-  extends Task(inputTaskContext, userConf) {
-
-  private val context = inputTaskContext.asInstanceOf[TaskWrapper]
-  private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](
-    GraphTask.OUT_PROCESSORS).get)
-  private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](
-    GraphTask.IN_PROCESSORS).get)
-
-  val sizeOfOutPorts = outMapping.keys.size
-  val sizeOfInPorts = inMapping.keys.size
-
-  private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] 
= {
-    val portToProcessor = processors.zipWithIndex.map { kv =>
-      (kv._2, kv._1)
-    }.toMap
-
-    val processorToIndex = processors.sorted.zipWithIndex.toMap
-
-    val portToIndex = portToProcessor.map { kv =>
-      val (outlet, processorId) = kv
-      val index = processorToIndex(processorId)
-      (outlet, index)
-    }
-    portToIndex
-  }
-
-  def output(outletId: Int, msg: Message): Unit = {
-    context.output(outMapping(outletId), msg)
-  }
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onStop(): Unit = {}
-}
-
-object GraphTask {
-  val OUT_PROCESSORS = "akka.stream.gearpump.task.outprocessors"
-  val IN_PROCESSORS = "akka.stream.gearpump.task.inprocessors"
-
-  type PortId = Int
-  type Index = Int
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
deleted file mode 100644
index b681852..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import java.util
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor.Receive
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
-import akka.util.Timeout
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Bridge Task when data flow is from remote Gearpump Task to local 
Akka-Stream Module
- *
- *
- * upstream [[Task]] -> [[SinkBridgeTask]]
- *                         \              Remote Cluster
- * -------------------------\----------------------
- *                           \            Local JVM
- *                            \|
- *                       Akka Stream [[Subscriber]]
- *
- */
-class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-  import taskContext.taskId
-
-  val queue = new util.LinkedList[Message]()
-  var subscriber: ActorRef = null
-
-  var request: Int = 0
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onNext(msg: Message): Unit = {
-    queue.add(msg)
-    trySendingData()
-  }
-
-  override def onStop(): Unit = {}
-
-  private def trySendingData(): Unit = {
-    if (subscriber != null) {
-      (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
-        subscriber ! msg.msg
-        request -= 1
-      }
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case RequestMessage(n) =>
-      this.subscriber = sender
-      LOG.info("the downstream has requested " + n + " messages from " + 
subscriber)
-      request += n.toInt
-      trySendingData()
-    case msg =>
-      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
-  }
-}
-
-object SinkBridgeTask {
-
-  case class RequestMessage(number: Int)
-
-  class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, 
appId: Int, processorId: ProcessorId) extends Publisher[AnyRef] with 
Subscription {
-    private val taskId = TaskId(processorId, index = 0)
-
-    private val LOG = LogUtil.getLogger(getClass)
-
-    private var actor: ActorRef = null
-    import system.dispatcher
-
-    private val task = context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map { container =>
-      // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
-      container.task
-    }
-
-    override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = {
-      this.actor = system.actorOf(Props(new ClientActor(subscriber)))
-      subscriber.onSubscribe(this)
-    }
-
-    override def cancel(): Unit = Unit
-
-    private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
-    override def request(l: Long): Unit = {
-      task.foreach { task =>
-        task.tell(RequestMessage(l.toInt), actor)
-      }
-    }
-  }
-
-  class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
-    def receive: Receive = {
-      case result: AnyRef => subscriber.onNext(result)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
deleted file mode 100644
index ccbd350..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import scala.concurrent.ExecutionContext
-
-import akka.actor.Actor.Receive
-import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, 
Complete, Error}
-import org.reactivestreams.{Subscriber, Subscription}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
-
-/**
- * Bridge Task when data flow is from local Akka-Stream Module to remote 
Gearpump Task
- *
- *
- *
- *      [[SourceBridgeTask]]   --> downstream [[Task]]
- *                 /|                Remote Cluster
- * ---------------/--------------------------------
- *               /                    Local JVM
- *    Akka Stream [[org.reactivestreams.Publisher]]
- *
- */
-class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-  import taskContext.taskId
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onNext(msg: Message): Unit = {
-    LOG.info("AkkaStreamSource receiving message " + msg)
-  }
-
-  override def onStop(): Unit = {}
-
-  override def receiveUnManagedMessage: Receive = {
-    case Error(ex) =>
-      LOG.error("the stream has error", ex)
-    case AkkaStreamMessage(msg) =>
-      LOG.error("we have received message from akka stream source: " + msg)
-      taskContext.output(Message(msg, System.currentTimeMillis()))
-    case Complete(description) =>
-      LOG.error("the stream is completed: " + description)
-    case msg =>
-      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
-  }
-}
-
-object SourceBridgeTask {
-  case class Error(ex: java.lang.Throwable)
-
-  case class Complete(description: String)
-
-  case class AkkaStreamMessage(msg: AnyRef)
-
-  class SourceBridgeTaskClient[T <: AnyRef](ec: ExecutionContext, context: 
ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
-    val taskId = TaskId(processorId, 0)
-    var subscription: Subscription = null
-    implicit val dispatcher = ec
-
-    val task = context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map { container =>
-      // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
-      container.task
-    }
-
-    override def onError(throwable: Throwable): Unit = {
-      task.map(task => task ! Error(throwable))
-    }
-
-    override def onSubscribe(subscription: Subscription): Unit = {
-      this.subscription = subscription
-      task.map(task => subscription.request(1))
-    }
-
-    override def onComplete(): Unit = {
-      task.map(task => task ! Complete("the upstream is completed"))
-    }
-
-    override def onNext(t: T): Unit = {
-      task.map { task =>
-        task ! AkkaStreamMessage(t)
-      }
-      subscription.request(1)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
deleted file mode 100644
index 78fabbe..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class UnZip2Task(context: TaskContext, userConf: UserConfig) extends 
GraphTask(context, userConf) {
-
-  val unzip = 
userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
-
-  override def onNext(msg: Message): Unit = {
-    val message = msg.msg
-    val time = msg.timestamp
-    val pair = unzip(message)
-    val (a, b) = pair
-    output(0, Message(a.asInstanceOf[AnyRef], time))
-    output(1, Message(b.asInstanceOf[AnyRef], time))
-  }
-}
-
-object UnZip2Task {
-  class UnZipFunction(val unzip: Any => (Any, Any)) extends Serializable
-
-  val UNZIP2_FUNCTION = "akka.stream.gearpump.task.unzip2.function"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
deleted file mode 100644
index c774fc7..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.util
-
-import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, 
MaterializedValueNode, Module, Transform}
-
-class MaterializedValueOps(mat: MaterializedValueNode) {
-  def resolve[Mat](materializedValues: Map[Module, Any]): Mat = {
-    def resolveMaterialized(mat: MaterializedValueNode, materializedValues: 
Map[Module, Any]): Any = mat match {
-      case Atomic(m) => materializedValues.getOrElse(m, ())
-      case Combine(f, d1, d2) => f(resolveMaterialized(d1, 
materializedValues), resolveMaterialized(d2, materializedValues))
-      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
-      case Ignore => ()
-    }
-    resolveMaterialized(mat, materializedValues).asInstanceOf[Mat]
-  }
-}
-
-object MaterializedValueOps {
-  def apply(mat: MaterializedValueNode): MaterializedValueOps = new 
MaterializedValueOps(mat)
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
new file mode 100644
index 0000000..4384b39
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import akka.stream.Attributes
+import akka.stream.Attributes.Attribute
+
+object GearAttributes {
+  /**
+   * Define how many parallel instance we want to use to run this module
+   * @param count Int
+   * @return
+   */
+  def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
+
+  /**
+   * Define we want to render this module locally.
+   * @return
+   */
+  def local: Attributes = Attributes(LocationAttribute(Local))
+
+  /**
+   * Define we want to render this module remotely
+   * @return
+   */
+  def remote: Attributes = Attributes(LocationAttribute(Remote))
+
+  /**
+   * Get the effective location settings if child override the parent
+   * setttings.
+   *
+   * @param attrs Attributes
+   * @return
+   */
+  def location(attrs: Attributes): Location = {
+    attrs.attributeList.foldLeft(Local: Location) { (s, attr) =>
+      attr match {
+        case LocationAttribute(location) => location
+        case other => s
+      }
+    }
+  }
+
+  /**
+   * get effective parallelism settings if child override parent.
+   * @param attrs Attributes
+   * @return
+   */
+  def count(attrs: Attributes): Int = {
+    attrs.attributeList.foldLeft(1) { (s, attr) =>
+      attr match {
+        case ParallismAttribute(count) => count
+        case other => s
+      }
+    }
+  }
+
+  /**
+   * Where we want to render the module
+   */
+  sealed trait Location
+  object Local extends Location
+  object Remote extends Location
+
+  final case class LocationAttribute(tag: Location) extends Attribute
+
+  /**
+   * How many parallel instance we want to use for this module.
+   *
+   * @param parallelism Int
+   */
+  final case class ParallismAttribute(parallelism: Int) extends Attribute
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
new file mode 100644
index 0000000..07c95f8
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import akka.NotUsed
+import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, 
Cancellable, ExtendedActorSystem}
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.Attributes.Attribute
+import akka.stream.impl.Stages.SymbolicGraphStage
+import akka.stream.impl.StreamLayout.{Atomic, Combine, CopiedModule, Ignore, 
MaterializedValueNode, Module, Transform}
+import akka.stream.{ActorAttributes, ActorMaterializerSettings, Attributes, 
ClosedShape, Fusing, Graph, InPort, OutPort, SinkShape}
+import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
+import akka.stream.impl.{ExtendedActorMaterializer, StreamSupervisor}
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
+import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
+import org.apache.gearpump.akkastream.graph._
+import org.apache.gearpump.util.{Graph => GGraph}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContextExecutor, Promise}
+import scala.concurrent.duration.FiniteDuration
+
+object GearpumpMaterializer {
+
+  final case class Edge(from: OutPort, to: InPort)
+
+  final case class MaterializedValueSourceAttribute(mat: 
MaterializedValueNode) extends Attribute
+
+  implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new 
AtomicBoolean(bool)
+
+  def apply(strategy: Strategy)(implicit context: ActorRefFactory): 
ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    apply(ActorMaterializerSettings(
+      system).withAutoFusing(false), strategy, useLocalCluster = false, 
"flow")(context)
+  }
+
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+      useLocalCluster: Boolean = true,
+      namePrefix: Option[String] = None)(implicit context: ActorRefFactory):
+    ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    val settings = materializerSettings getOrElse
+      ActorMaterializerSettings(system).withAutoFusing(false)
+    apply(settings, strategy, useLocalCluster, 
namePrefix.getOrElse("flow"))(context)
+  }
+
+  def apply(materializerSettings: ActorMaterializerSettings,
+      strategy: Strategy,
+      useLocalCluster: Boolean,
+      namePrefix: String)(implicit context: ActorRefFactory):
+    ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    new GearpumpMaterializer(
+      system,
+      materializerSettings,
+      context.actorOf(
+        StreamSupervisor.props(materializerSettings, false).withDispatcher(
+          materializerSettings.dispatcher), StreamSupervisor.nextName()))
+  }
+
+
+  private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
+    val system = context match {
+      case s: ExtendedActorSystem => s
+      case c: ActorContext => c.system
+      case null => throw new IllegalArgumentException("ActorRefFactory context 
must be defined")
+      case _ =>
+        throw new IllegalArgumentException(
+          s"""
+            |  context must be a ActorSystem or ActorContext, got 
[${context.getClass.getName}]
+          """.stripMargin
+        )
+    }
+    system
+  }
+
+}
+
+/**
+ *
+ * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
+ * streaming application. If some module cannot be rendered remotely in 
Gearpump
+ * Cluster, then it will use local Actor materializer as fallback to 
materialize
+ * the module locally.
+ *
+ * User can customize a 
[[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]]
+ * to determine which module should be rendered
+ * remotely, and which module should be rendered locally.
+ *
+ * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]]
+ *     to find out how we cut the runnableGraph to two parts,
+ *      and materialize them separately.
+ * @param system          ActorSystem
+ * @param strategy        Strategy
+ * @param useLocalCluster whether to use built-in in-process local cluster
+ */
+class GearpumpMaterializer(override val system: ActorSystem,
+    override val settings: ActorMaterializerSettings,
+    override val supervisor: ActorRef,
+    strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+    useLocalCluster: Boolean = true, namePrefix: Option[String] = None)
+  extends ExtendedActorMaterializer {
+
+  private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
+    classOf[LocalGraph] -> new LocalGraphMaterializer(system),
+    classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, 
system)
+  )
+
+  override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+  override def isShutdown: Boolean = system.whenTerminated.isCompleted
+
+  override def effectiveSettings(opAttr: Attributes): 
ActorMaterializerSettings = {
+    import ActorAttributes._
+    import Attributes._
+    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+      attr match {
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+        case _ => s
+      }
+    }
+  }
+
+  override def withNamePrefix(name: String): ExtendedActorMaterializer =
+    throw new UnsupportedOperationException()
+
+  override implicit def executionContext: ExecutionContextExecutor =
+    throw new UnsupportedOperationException()
+
+  override def schedulePeriodically(initialDelay: FiniteDuration,
+      interval: FiniteDuration,
+      task: Runnable): Cancellable =
+    system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+  override def scheduleOnce(delay: FiniteDuration, task: Runnable): 
Cancellable =
+    system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = 
{
+    val info = Fusing.aggressive(runnableGraph).module.info
+    val graph = GGraph.empty[Module, Edge]
+
+    info.subModules.foreach(module => {
+      if (module.isCopied) {
+        val original = module.asInstanceOf[CopiedModule].copyOf
+        graph.addVertex(original)
+        module.shape.outlets.zip(original.shape.outlets).foreach(out => {
+          val (cout, oout) = out
+          val cin = info.downstreams(cout)
+          val downStreamModule = info.inOwners(cin)
+          if(downStreamModule.isCopied) {
+            val downStreamOriginal = 
downStreamModule.asInstanceOf[CopiedModule].copyOf
+            
downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in 
=> {
+              in._1 == cin match {
+                case true =>
+                  val oin = in._2
+                  graph.addEdge(original, Edge(oout, oin), downStreamOriginal)
+                case false =>
+              }
+            })
+          }
+        })
+      }
+    })
+
+    printGraph(graph)
+
+    val subGraphs = GraphPartitioner(strategy).partition(graph)
+    val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, 
subGraph) =>
+      val materializer = subMaterializers(subGraph.getClass)
+      map ++ materializer.materialize(subGraph, map)
+    }
+    val mat = matValues.flatMap(pair => {
+      val (module, any) = pair
+      any match {
+        case notUsed: NotUsed =>
+          None
+        case others =>
+          val rt = module.shape match {
+            case sink: SinkShape[_] =>
+              Some(any)
+            case _ =>
+              None
+          }
+          rt
+      }
+    }).toList
+    val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
+    resolveMaterialized(matModule.materializedValueComputation, matValues)
+    val rt = Some(mat).flatMap(any => {
+      any match {
+        case promise: Promise[_] =>
+          Some(promise.future)
+        case other =>
+          Some(other)
+      }
+    })
+    rt.orNull.asInstanceOf[Mat]
+  }
+
+  private def printGraph(graph: GGraph[Module, Edge]): Unit = {
+    val iterator = graph.topologicalOrderIterator
+    while (iterator.hasNext) {
+      val module = iterator.next()
+      // scalastyle:off println
+      module match {
+        case graphStageModule: GraphStageModule =>
+          graphStageModule.stage match {
+            case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+              val symbolicName = 
symbolicGraphStage.symbolicStage.getClass.getSimpleName
+              println(
+                s"${module.getClass.getSimpleName}(${symbolicName})"
+              )
+            case graphStage: GraphStage[_] =>
+              val name = graphStage.getClass.getSimpleName
+              println(
+                s"${module.getClass.getSimpleName}(${name})"
+              )
+            case other =>
+              println(
+                
s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+              )
+          }
+        case _ =>
+          println(module.getClass.getSimpleName)
+      }
+      // scalastyle:on println
+    }
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: 
Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+    logger
+  }
+
+  def shutdown: Unit = {
+    subMaterializers.values.foreach(_.shutdown)
+  }
+
+  private def resolveMaterialized(mat: MaterializedValueNode,
+      materializedValues: mutable.Map[Module, Any]): Any = mat match {
+    case Atomic(m) =>
+      materializedValues.getOrElse(m, ())
+    case Combine(f, d1, d2) =>
+      f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, 
materializedValues))
+    case Transform(f, d) =>
+      f(resolveMaterialized(d, materializedValues))
+    case Ignore =>
+      ()
+  }
+
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
new file mode 100644
index 0000000..8a869d2
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.{util => ju}
+
+import _root_.org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, 
MaterializedValueSourceAttribute}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+
+class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module,
+    initialAttributes: Attributes, namePrefix: Option[String] = None)
+  extends MaterializerSession(topLevel, initialAttributes) {
+
+  private[this] def createFlowName(): String =
+    FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next()
+
+  private val flowName = createFlowName()
+  private var nextId = 0
+
+  private def stageName(attr: Attributes): String = {
+    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+    nextId += 1
+    name
+  }
+
+  val graph = GGraph.empty[Module, Edge]
+
+  def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): 
Unit = {
+    graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), 
subscriber._2)
+  }
+
+  def addVertex(module: Module): Unit = {
+    graph.addVertex(module)
+  }
+
+  override def materializeModule(module: Module, parentAttributes: 
Attributes): Any = {
+
+    val materializedValues: ju.Map[Module, Any] = new ju.HashMap
+    val currentAttributes = mergeAttributes(parentAttributes, 
module.attributes)
+
+    val materializedValueSources = List.empty[MaterializedValueSource[_]]
+
+    for (submodule <- module.subModules) {
+      submodule match {
+        case atomic: AtomicModule =>
+          materializeAtomic(atomic, currentAttributes, materializedValues)
+        case copied: CopiedModule =>
+          enterScope(copied)
+          materializedValues.put(copied, materializeModule(copied, 
currentAttributes))
+          exitScope(copied)
+        case composite =>
+          materializedValues.put(composite, materializeComposite(composite, 
currentAttributes))
+        case EmptyModule =>
+      }
+    }
+
+    val mat = resolveMaterialized(module.materializedValueComputation, 
materializedValues)
+
+    materializedValueSources.foreach { module =>
+      val matAttribute =
+        new 
MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode])
+      val copied = copyAtomicModule(module.module, parentAttributes
+        and Attributes(matAttribute))
+      // TODO
+      // assignPort(module.shape.out, (copied.shape.outlets.head, copied))
+      addVertex(copied)
+      materializedValues.put(copied, Atomic(copied))
+    }
+    mat
+
+  }
+
+  override protected def materializeComposite(composite: Module,
+      effectiveAttributes: Attributes): Any = {
+    materializeModule(composite, effectiveAttributes)
+  }
+
+  protected def materializeAtomic(atomic: AtomicModule,
+      parentAttributes: Attributes,
+    matVal: ju.Map[Module, Any]): Unit = {
+
+    val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
+    val copied = copyAtomicModule(atomic, parentAttributes)
+
+    for ((in, id) <- inputs.zipWithIndex) {
+      val inPort = inPortMapping(atomic, copied)(in)
+      // assignPort(in, (inPort, copied))
+    }
+
+    for ((out, id) <- outputs.zipWithIndex) {
+      val outPort = outPortMapping(atomic, copied)(out)
+      // TODO
+      // assignPort(out, (outPort, copied))
+    }
+
+    addVertex(copied)
+    matVal.put(atomic, Atomic(copied))
+  }
+
+  private def copyAtomicModule[T <: Module](module: T, parentAttributes: 
Attributes): T = {
+    val currentAttributes = mergeAttributes(parentAttributes, 
module.attributes)
+    module.withAttributes(currentAttributes).asInstanceOf[T]
+  }
+
+  private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] 
= {
+    from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
+  }
+
+  private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
+    from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
+  }
+
+  protected def resolveMaterialized(matNode: MaterializedValueNode,
+      materializedValues: ju.Map[Module, Any]):
+    Any =
+    matNode match {
+      case Atomic(m) => materializedValues.get(m)
+      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+        resolveMaterialized(d2, materializedValues))
+      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+      case Ignore => Ignore
+    }
+}
+
+object GearpumpMaterializerSession {
+  def apply(system: ActorSystem, topLevel: Module,
+      initialAttributes: Attributes, namePrefix: Option[String] = None):
+  GearpumpMaterializerSession = {
+    new GearpumpMaterializerSession(system, topLevel, initialAttributes, 
namePrefix)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
new file mode 100644
index 0000000..52a45d9
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.{Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * Source and Sink are materialized locally.
+ * Remaining GraphStages are materialized remotely:
+ *  statefulMap, filter, fold, flatMap
+ */
+object Test extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test", akkaConf)
+    implicit val materializer = 
GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy)
+
+    val echo = system.actorOf(Props(new Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+
+    Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green 
plant", "blue sky")
+    ).filter(_.startsWith("red")).fold("Items:") {(a, b) =>
+      a + "|" + b
+    }.map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}


Reply via email to