http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
new file mode 100644
index 0000000..8fbe785
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, 
SourceBridgeModule}
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
+import 
org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
+import akka.stream.impl.StreamLayout.Module
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.util.Graph
+
+/**
+ *
+ * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ *  contain modules that can be materialized in remote Gearpump cluster.
+ *
+ * @param graph Graph
+ */
+class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object RemoteGraph {
+
+  /**
+   * * materialize LocalGraph in remote gearpump cluster
+   * @param useInProcessCluster Boolean
+   * @param system ActorSystem
+   */
+  class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: 
ActorSystem)
+    extends SubGraphMaterializer {
+    private val local = if (useInProcessCluster) {
+      val cluster = EmbeddedCluster()
+      cluster.start()
+      Some(cluster)
+    } else {
+      None
+    }
+
+    private val context: ClientContext = local match {
+      case Some(l) => l.newClientContext
+      case None => ClientContext(system)
+    }
+
+    override def materialize(subGraph: SubGraph,
+        inputMatValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val graph = subGraph.graph
+      
+      if (graph.isEmpty) {
+        inputMatValues
+      } else {
+        doMaterialize(graph: Graph[Module, Edge], inputMatValues)
+      }
+    }
+
+    private def doMaterialize(graph: Graph[Module, Edge],
+        inputMatValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val materializer = new RemoteMaterializerImpl(graph, system)
+      val (app, matValues) = materializer.materialize
+
+      val appId = context.submit(app)
+      // scalastyle:off println
+      println("sleep 5 second until the application is ready on cluster")
+      // scalastyle:on println
+      Thread.sleep(5000)
+
+      def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
+        matValues.toList.flatMap { kv =>
+          val (module, processorId) = kv
+          module match {
+            case source: SourceBridgeModule[_, _] =>
+              val bridge = new 
SourceBridgeTaskClient[AnyRef](system.dispatcher,
+                context, appId, processorId)
+              Some((module, bridge))
+            case sink: SinkBridgeModule[_, _] =>
+              val bridge = new SinkBridgeTaskClient(system, context, appId, 
processorId)
+              Some((module, bridge))
+            case other =>
+              None
+          }
+        }.toMap
+      }
+
+      inputMatValues ++ resolve(matValues)
+    }
+
+    override def shutdown: Unit = {
+      context.close()
+      local.foreach(_.stop())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
new file mode 100644
index 0000000..a0395de
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import akka.stream.impl.StreamLayout.Module
+import org.apache.gearpump.util.Graph
+
+/**
+ * [[SubGraph]] is a partial DAG
+ *
+ * The idea is that by dividing [[Graph]] to several
+ * [[SubGraph]], we can materialize each [[SubGraph]] with different
+ * materializer.
+ */
+
+trait SubGraph {
+
+  /**
+   * the [[Graph]] representation of this SubGraph
+   * @return
+   */
+  def graph: Graph[Module, Edge]
+}
+
+
+/**
+ * Materializer for Sub-Graph type
+ */
+trait SubGraphMaterializer {
+  /**
+   *
+   * @param matValues Materialized Values for each module before 
materialization
+   * @return Materialized Values for each Module after the materialization.
+   */
+
+  def materialize(graph: SubGraph,
+      matValues: scala.collection.mutable.Map[Module, Any]):
+    scala.collection.mutable.Map[Module, Any]
+
+  def shutdown: Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
new file mode 100644
index 0000000..cbafcf5
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.{util => ju}
+
+import _root_.org.apache.gearpump.util.{Graph => GGraph}
+import akka.NotUsed
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill}
+import akka.dispatch.Dispatchers
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+import akka.stream.impl.fusing.{Map => _, _}
+import akka.stream.impl.io.{TLSActor, TlsModule}
+import akka.stream.scaladsl.{GraphDSL, Keep, ModuleExtractor, RunnableGraph}
+import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.ReduceModule
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.reactivestreams.{Publisher, Subscriber}
+
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * This materializer is functional equivalent to 
[[akka.stream.impl.ActorMaterializerImpl]]
+ *
+ * @param system System
+ * @param settings ActorMaterializerSettings
+ * @param dispatchers Dispatchers
+ * @param supervisor ActorRef
+ * @param haveShutDown AtomicBoolean
+ * @param flowNames SeqActorName
+ */
+case class LocalMaterializerImpl (
+    override val system: ActorSystem,
+    override val settings: ActorMaterializerSettings,
+    dispatchers: Dispatchers,
+    override val supervisor: ActorRef,
+    haveShutDown: AtomicBoolean,
+    flowNames: SeqActorName)
+  extends ExtendedActorMaterializer {
+
+  override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+  override def schedulePeriodically(initialDelay: FiniteDuration, interval: 
FiniteDuration,
+      task: Runnable): Cancellable =
+    system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+  override def scheduleOnce(delay: FiniteDuration, task: Runnable): 
Cancellable =
+    system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+  override def effectiveSettings(opAttr: Attributes): 
ActorMaterializerSettings = {
+    import ActorAttributes._
+    import Attributes._
+    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+      attr match {
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+        case l: LogLevels => s
+        case Name(_) => s
+        case other => s
+      }
+    }
+  }
+
+  override def shutdown(): Unit =
+    if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
+
+  override def isShutdown: Boolean = haveShutDown.get()
+
+  override lazy val executionContext: ExecutionContextExecutor =
+    dispatchers.lookup(settings.dispatcher match {
+      case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
+      case other => other
+  })
+
+
+  case class LocalMaterializerSession(module: Module, iAttributes: Attributes,
+      subflowFuser: GraphInterpreterShell => ActorRef = null)
+    extends MaterializerSession(module, iAttributes) {
+
+    override def materializeAtomic(atomic: AtomicModule,
+        effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
+
+      def newMaterializationContext() =
+        new MaterializationContext(LocalMaterializerImpl.this, 
effectiveAttributes,
+          stageName(effectiveAttributes))
+      atomic match {
+        case sink: SinkModule[_, _] =>
+          val (sub, mat) = sink.create(newMaterializationContext())
+          assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]])
+          matVal.put(atomic, mat)
+        case source: SourceModule[_, _] =>
+          val (pub, mat) = source.create(newMaterializationContext())
+          assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
+          matVal.put(atomic, mat)
+        case stage: ProcessorModule[_, _, _] =>
+          val (processor, mat) = stage.createProcessor()
+          assignPort(stage.inPort, processor)
+          assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
+          matVal.put(atomic, mat)
+        case tls: TlsModule => // TODO solve this so TlsModule doesn't need 
special treatment here
+          val es = effectiveSettings(effectiveAttributes)
+          val props =
+            TLSActor.props(es, tls.sslContext, tls.sslConfig,
+              tls.firstSession, tls.role, tls.closing, tls.hostInfo)
+          val impl = actorOf(props, stageName(effectiveAttributes), 
es.dispatcher)
+          def factory(id: Int) = new ActorPublisher[Any](impl) {
+            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
+          }
+          val publishers = Vector.tabulate(2)(factory)
+          impl ! FanOut.ExposedPublishers(publishers)
+
+          assignPort(tls.plainOut, publishers(TLSActor.UserOut))
+          assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
+
+          assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
+          assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, 
TLSActor.TransportIn))
+
+          matVal.put(atomic, NotUsed)
+        case graph: GraphModule =>
+          matGraph(graph, effectiveAttributes, matVal)
+        case stage: GraphStageModule =>
+          val graph =
+            GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, 
stage.stage),
+              stage.shape, stage.attributes, Array(stage))
+          matGraph(graph, effectiveAttributes, matVal)
+      }
+    }
+
+    private def matGraph(graph: GraphModule, effectiveAttributes: Attributes,
+        matVal: ju.Map[Module, Any]): Unit = {
+      val calculatedSettings = effectiveSettings(effectiveAttributes)
+      val (handlers, logics) =
+        graph.assembly.materialize(effectiveAttributes, graph.matValIDs, 
matVal, registerSrc)
+
+      val shell = new GraphInterpreterShell(graph.assembly, handlers,
+        logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this)
+
+      val impl =
+        if (subflowFuser != null && 
!effectiveAttributes.contains(Attributes.AsyncBoundary)) {
+          subflowFuser(shell)
+        } else {
+          val props = ActorGraphInterpreter.props(shell)
+          actorOf(props, stageName(effectiveAttributes), 
calculatedSettings.dispatcher)
+        }
+
+      for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) {
+        val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, 
shell, i)
+        assignPort(inlet, subscriber)
+      }
+      for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) {
+        val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, 
shell, i)
+        impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
+        assignPort(outlet, publisher)
+      }
+    }
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): 
Mat = {
+
+    LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+      null, null).materialize().asInstanceOf[Mat]
+
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+
+    LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+      null, null).materialize().asInstanceOf[Mat]
+
+  }
+
+  def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
+    var moduleInProgress: Module = EmptyModule
+    graph.vertices.foreach(module => {
+      moduleInProgress = moduleInProgress.compose(module)
+    })
+    graph.edges.foreach(value => {
+      val (node1, edge, node2) = value
+      moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
+    })
+
+    moduleInProgress
+  }
+
+  def materialize(graph: GGraph[Module, Edge],
+      inputMatValues: scala.collection.mutable.Map[Module, Any]):
+      scala.collection.mutable.Map[Module, Any] = {
+    val topLevelModule = buildToplevelModule(graph)
+    val session = LocalMaterializerSession(topLevelModule, null, null)
+    import scala.collection.JavaConverters._
+    val matV = inputMatValues.asJava
+    val materializedGraph = graph.mapVertex { module =>
+      session.materializeAtomic(module.asInstanceOf[AtomicModule], 
module.attributes, matV)
+      matV.get(module)
+    }
+    materializedGraph.edges.foreach { nodeEdgeNode =>
+      val (node1, edge, node2) = nodeEdgeNode
+      val from = edge.from
+      val to = edge.to
+      node1 match {
+        case module1: Module =>
+          node2 match {
+            case module2: Module =>
+              val publisher = 
module1.downstreams(from).asInstanceOf[Publisher[Any]]
+              val subscriber = 
module2.upstreams(to).asInstanceOf[Subscriber[Any]]
+              publisher.subscribe(subscriber)
+            case _ =>
+          }
+        case _ =>
+      }
+    }
+    val matValSources = graph.vertices.flatMap(module => {
+      val rt: Option[MaterializedValueSource[_]] = module match {
+        case graphStage: GraphStageModule =>
+          graphStage.stage match {
+            case materializedValueSource: MaterializedValueSource[_] =>
+              Some(materializedValueSource)
+            case _ =>
+              None
+          }
+        case _ =>
+          None
+      }
+      rt
+    })
+    publishToMaterializedValueSource(matValSources, inputMatValues)
+    inputMatValues
+  }
+
+  private def publishToMaterializedValueSource(modules: 
List[MaterializedValueSource[_]],
+      matValues: scala.collection.mutable.Map[Module, Any]): Unit = {
+    modules.foreach { source =>
+      Option(source.computation).map { attr =>
+        val valueToPublish = MaterializedValueOps(attr).resolve(matValues)
+        source.setValue(valueToPublish)
+      }
+    }
+  }
+
+  private[this] def createFlowName(): String = flowNames.next()
+
+  val flowName = createFlowName()
+  var nextId = 0
+
+  def stageName(attr: Attributes): String = {
+    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+    nextId += 1
+    name
+  }
+
+  override def withNamePrefix(name: String): LocalMaterializerImpl =
+    this.copy(flowNames = flowNames.copy(name))
+
+}
+
+object LocalMaterializerImpl {
+  case class MaterializedModule(module: Module, matValue: Any,
+      inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]],
+      outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
+
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      namePrefix: Option[String] = None)(implicit system: ActorSystem):
+  LocalMaterializerImpl = {
+
+    val settings = materializerSettings getOrElse 
ActorMaterializerSettings(system)
+    apply(settings, namePrefix.getOrElse("flow"))(system)
+  }
+
+  def apply(materializerSettings: ActorMaterializerSettings,
+      namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl 
= {
+    val haveShutDown = new AtomicBoolean(false)
+
+    new LocalMaterializerImpl(
+      system,
+      materializerSettings,
+      system.dispatchers,
+      system.actorOf(StreamSupervisor.props(materializerSettings,
+        haveShutDown).withDispatcher(materializerSettings.dispatcher)),
+      haveShutDown,
+      FlowNames(system).name.copy(namePrefix))
+  }
+
+  def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = {
+    val f = reduce.f
+    val aggregator = {(zero: Any, input: Any) =>
+      if (zero == null) {
+        input
+      } else {
+        f(zero, input)
+      }
+    }
+    Fold(null, aggregator)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
new file mode 100644
index 0000000..f3f8094
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import akka.actor.ActorSystem
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.Timers._
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, 
SimpleLinearGraphStage, SingleSource, TickSource}
+import akka.stream.impl.fusing.{Map => FMap, _}
+import akka.stream.impl.io.IncomingConnectionStage
+import akka.stream.impl.{HeadOptionStage, Stages, Throttle}
+import akka.stream.scaladsl._
+import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
+import akka.stream.stage.GraphStage
+import akka.stream.{FanInShape, FanOutShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module._
+import org.apache.gearpump.akkastream.task._
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Promise
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a 
Gearpump
+ * Streaming Application.
+ *
+ * @param graph Graph
+ * @param system ActorSystem
+ */
+class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
+
+  import RemoteMaterializerImpl._
+
+  type ID = String
+  private implicit val actorSystem = system
+
+  private def uuid: String = {
+    java.util.UUID.randomUUID.toString
+  }
+
+  def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
+    val (opGraph, ids) = toOpGraph
+    val app: StreamApplication = new StreamApp("app", system, 
UserConfig.empty, opGraph)
+    val processorIds = resolveIds(app, ids)
+
+    val updatedApp = updateJunctionConfig(processorIds, app)
+    (removeIds(updatedApp), processorIds)
+  }
+
+  private def updateJunctionConfig(processorIds: Map[Module, ProcessorId],
+      app: StreamApplication): StreamApplication = {
+    val config = junctionConfig(processorIds)
+
+    val dag = app.dag.mapVertex { vertex =>
+      val processorId = vertex.id
+      val newConf = vertex.taskConf.withConfig(config(processorId))
+      vertex.copy(taskConf = newConf)
+    }
+    new StreamApplication(app.name, app.inputUserConfig, dag)
+  }
+
+  private def junctionConfig(processorIds: Map[Module, ProcessorId]):
+  Map[ProcessorId, UserConfig] = {
+    val updatedConfigs = graph.vertices.flatMap { vertex =>
+      buildShape(vertex, processorIds)
+    }.toMap
+    updatedConfigs
+  }
+
+  private def buildShape(vertex: Module, processorIds: Map[Module, 
ProcessorId]):
+  Option[(ProcessorId, UserConfig)] = {
+    def inProcessors(vertex: Module): List[ProcessorId] = {
+      vertex.shape.inlets.flatMap { inlet =>
+        graph.incomingEdgesOf(vertex).find(
+          _._2.to == inlet).map(_._1
+        ).flatMap(processorIds.get(_))
+      }.toList
+    }
+    def outProcessors(vertex: Module): List[ProcessorId] = {
+      vertex.shape.outlets.flatMap { outlet =>
+        graph.outgoingEdgesOf(vertex).find(
+          _._2.from == outlet).map(_._3
+        ).flatMap(processorIds.get(_))
+      }.toList
+    }
+    processorIds.get(vertex).map(processorId => {
+      (processorId, UserConfig.empty.
+        withValue(GraphTask.OUT_PROCESSORS, outProcessors(vertex)).
+        withValue(GraphTask.IN_PROCESSORS, inProcessors(vertex)))
+    })
+  }
+
+  private def resolveIds(app: StreamApplication, ids: Map[Module, ID]):
+  Map[Module, ProcessorId] = {
+    ids.flatMap { kv =>
+      val (module, id) = kv
+      val processorId = app.dag.vertices.find { processor =>
+        processor.taskConf.getString(id).isDefined
+      }.map(_.id)
+      processorId.map((module, _))
+    }
+  }
+
+  private def removeIds(app: StreamApplication): StreamApplication = {
+    val graph = app.dag.mapVertex { processor =>
+      val conf = removeId(processor.taskConf)
+      processor.copy(taskConf = conf)
+    }
+    new StreamApplication(app.name, app.inputUserConfig, graph)
+  }
+
+  private def removeId(conf: UserConfig): UserConfig = {
+    conf.filter { kv =>
+      kv._2 != RemoteMaterializerImpl.TRACKABLE
+    }
+  }
+
+  private def toOpGraph: (Graph[Op, OpEdge], Map[Module, ID]) = {
+    var matValues = collection.mutable.Map.empty[Module, ID]
+    val opGraph = graph.mapVertex[Op] { module =>
+      val name = uuid
+      val conf = UserConfig.empty.withString(name, 
RemoteMaterializerImpl.TRACKABLE)
+      matValues += module -> name
+      val parallelism = GearAttributes.count(module.attributes)
+      val op = module match {
+        case source: SourceTaskModule[_] =>
+          val updatedConf = conf.withConfig(source.conf)
+          DataSourceOp(source.source, parallelism, updatedConf, "source")
+        case sink: SinkTaskModule[_] =>
+          val updatedConf = conf.withConfig(sink.conf)
+          DataSinkOp(sink.sink, parallelism, updatedConf, "sink")
+        case sourceBridge: SourceBridgeModule[_, _] =>
+          ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, 
"source")
+        case processor: ProcessorModule[_, _, _] =>
+          val updatedConf = conf.withConfig(processor.conf)
+          ProcessorOp(processor.processor, parallelism, updatedConf, "source")
+        case sinkBridge: SinkBridgeModule[_, _] =>
+          ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
+        case groupBy: GroupByModule[_, _] =>
+          GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf)
+        case reduce: ReduceModule[_] =>
+          reduceOp(reduce.f, conf)
+        case graphStage: GraphStageModule =>
+          translateGraphStageWithMaterializedValue(graphStage, parallelism, 
conf)
+      }
+      if (op == null) {
+        throw new UnsupportedOperationException(
+          module.getClass.toString + " is not supported with 
RemoteMaterializer"
+        )
+      }
+      op
+    }.mapEdge[OpEdge] { (n1, edge, n2) =>
+      n2 match {
+        case master: MasterOp =>
+          Shuffle
+        case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
+          Shuffle
+        case slave: SlaveOp[_] =>
+          Direct
+      }
+    }
+    (opGraph, matValues.toMap)
+  }
+
+  private def translateGraphStageWithMaterializedValue(module: 
GraphStageModule,
+      parallelism: Int, conf: UserConfig): Op = {
+    module.stage match {
+      case tickSource: TickSource[_] =>
+        import TickSourceTask._
+        val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef]
+        val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, 
tickSource.initialDelay).
+          withValue[FiniteDuration](INTERVAL, tickSource.interval).
+          withValue(TICK, tick)
+        ProcessorOp(classOf[TickSourceTask[_]], parallelism, tiConf, 
"tickSource")
+      case graphStage: GraphStage[_] =>
+        translateGraphStage(module, parallelism, conf)
+      case headOptionStage: HeadOptionStage[_] =>
+        headOptionOp(headOptionStage, conf)
+      case pushPullGraphStageWithMaterializedValue:
+        PushPullGraphStageWithMaterializedValue[_, _, _, _] =>
+        translateSymbolic(pushPullGraphStageWithMaterializedValue, conf)
+    }
+  }
+
+  private def translateGraphStage(module: GraphStageModule,
+      parallelism: Int, conf: UserConfig): Op = {
+    module.stage match {
+      case balance: Balance[_] =>
+        ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
+      case batch: Batch[_, _] =>
+        val batchConf = conf.withValue[_ => Long](BatchTask.COST, 
batch.costFn).
+          withLong(BatchTask.MAX, batch.max).
+          withValue[(_, _) => _](BatchTask.AGGREGATE, batch.aggregate).
+          withValue[_ => _](BatchTask.SEED, batch.seed)
+        ProcessorOp(classOf[BatchTask[_, _]],
+          parallelism, batchConf, "batch")
+      case broadcast: Broadcast[_] =>
+        val name = 
ModuleExtractor.unapply(broadcast).map(_.attributes.nameOrDefault()).get
+        ProcessorOp(classOf[BroadcastTask], parallelism, conf, name)
+      case collect: Collect[_, _] =>
+        collectOp(collect.pf, conf)
+      case concat: Concat[_] =>
+        ProcessorOp(classOf[ConcatTask], parallelism, conf, "concat")
+      case delayInitial: DelayInitial[_] =>
+        val dIConf = conf.withValue[FiniteDuration](
+          DelayInitialTask.DELAY_INITIAL, delayInitial.delay)
+        ProcessorOp(classOf[DelayInitialTask[_]], parallelism, dIConf, 
"delayInitial")
+      case dropWhile: DropWhile[_] =>
+        dropWhileOp(dropWhile.p, conf)
+      case flattenMerge: FlattenMerge[_, _] =>
+        ProcessorOp(classOf[FlattenMergeTask], parallelism, conf, 
"flattenMerge")
+      case fold: Fold[_, _] =>
+        val foldConf = conf.withValue(FoldTask.ZERO, 
fold.zero.asInstanceOf[AnyRef]).
+          withValue(FoldTask.AGGREGATOR, fold.f)
+        ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
+      case groupBy: GroupBy[_, _] =>
+        GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf)
+      case groupedWithin: GroupedWithin[_] =>
+        val diConf = 
conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
+          withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
+        ProcessorOp(classOf[GroupedWithinTask[_]], parallelism, diConf, 
"groupedWithin")
+      case idleInject: IdleInject[_, _] =>
+        // TODO
+        null
+      case idleTimeoutBidi: IdleTimeoutBidi[_, _] =>
+        // TODO
+        null
+      case incomingConnectionStage: IncomingConnectionStage =>
+        // TODO
+        null
+      case interleave: Interleave[_] =>
+        val ilConf = conf.withInt(InterleaveTask.INPUT_PORTS, 
interleave.inputPorts).
+          withInt(InterleaveTask.SEGMENT_SIZE, interleave.segmentSize)
+        ProcessorOp(classOf[InterleaveTask], parallelism, ilConf, "interleave")
+        null
+      case intersperse: Intersperse[_] =>
+        // TODO
+        null
+      case limitWeighted: LimitWeighted[_] =>
+        // TODO
+        null
+      case map: FMap[_, _] =>
+        mapOp(map.f, conf)
+      case mapAsync: MapAsync[_, _] =>
+        ProcessorOp(classOf[MapAsyncTask[_, _]],
+          mapAsync.parallelism, conf.withValue(MapAsyncTask.MAPASYNC_FUNC, 
mapAsync.f), "mapAsync")
+      case mapAsyncUnordered: MapAsyncUnordered[_, _] =>
+        ProcessorOp(classOf[MapAsyncTask[_, _]],
+          mapAsyncUnordered.parallelism,
+          conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsyncUnordered.f), 
"mapAsyncUnordered")
+      case materializedValueSource: MaterializedValueSource[_] =>
+        // TODO
+        null
+      case merge: Merge[_] =>
+        val mergeConf = conf.withBoolean(MergeTask.EAGER_COMPLETE, 
merge.eagerComplete).
+          withInt(MergeTask.INPUT_PORTS, merge.inputPorts)
+        ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge")
+      case mergePreferred: MergePreferred[_] =>
+        MergeOp("mergePreferred", conf)
+      case mergeSorted: MergeSorted[_] =>
+        MergeOp("mergeSorted", conf)
+      case prefixAndTail: PrefixAndTail[_] =>
+        // TODO
+        null
+      case recover: Recover[_] =>
+        // TODO
+        null
+      case scan: Scan[_, _] =>
+        scanOp(scan.zero, scan.f, conf)
+      case simpleLinearGraphStage: SimpleLinearGraphStage[_] =>
+        translateSimpleLinearGraph(simpleLinearGraphStage, parallelism, conf)
+      case singleSource: SingleSource[_] =>
+        val singleSourceConf = conf.withValue[AnyRef](SingleSourceTask.ELEMENT,
+          singleSource.elem.asInstanceOf[AnyRef])
+        ProcessorOp(classOf[SingleSourceTask[_]], parallelism, 
singleSourceConf, "singleSource")
+      case split: Split[_] =>
+        // TODO
+        null
+      case statefulMapConcat: StatefulMapConcat[_, _] =>
+        val func = statefulMapConcat.f
+        val statefulMapConf =
+          conf.withValue[() => _ => Iterable[_]](StatefulMapConcatTask.FUNC, 
func)
+        ProcessorOp(classOf[StatefulMapConcatTask[_, _]], parallelism,
+          statefulMapConf, "statefulMapConcat")
+      case subSink: SubSink[_] =>
+        // TODO
+        null
+      case subSource: SubSource[_] =>
+        // TODO
+        null
+      case unfold: Unfold[_, _] =>
+        // TODO
+        null
+      case unfoldAsync: UnfoldAsync[_, _] =>
+        // TODO
+        null
+      case unzip: Unzip[_, _] =>
+        ProcessorOp(classOf[Unzip2Task[_, _, _]],
+          parallelism,
+          conf.withValue(
+            Unzip2Task.UNZIP2_FUNCTION, 
Unzip2Task.UnZipFunction(unzip.unzipper)
+          ), "unzip")
+      case zip: Zip[_, _] =>
+        zipWithOp(zip.zipper, conf)
+      case zipWith2: ZipWith2[_, _, _] =>
+        ProcessorOp(classOf[Zip2Task[_, _, _]],
+          parallelism,
+          conf.withValue(
+            Zip2Task.ZIP2_FUNCTION, Zip2Task.ZipFunction(zipWith2.zipper)
+          ), "zipWith2")
+    }
+  }
+
+  private def translateSimpleLinearGraph(stage: SimpleLinearGraphStage[_],
+      parallelism: Int, conf: UserConfig): Op = {
+    stage match {
+      case completion: Completion[_] =>
+        // TODO
+        null
+      case delay: Delay[_] =>
+        // TODO
+        null
+      case drop: Drop[_] =>
+        dropOp(drop.count, conf)
+      case dropWithin: DropWithin[_] =>
+        val dropWithinConf =
+          conf.withValue[FiniteDuration](DropWithinTask.TIMEOUT, 
dropWithin.timeout)
+        ProcessorOp(classOf[DropWithinTask[_]],
+          parallelism, dropWithinConf, "dropWithin")
+      case filter: Filter[_] =>
+        filterOp(filter.p, conf)
+      case idle: Idle[_] =>
+        // TODO
+        null
+      case initial: Initial[_] =>
+        // TODO
+        null
+      case log: Log[_] =>
+        logOp(log.name, log.extract, conf)
+      case reduce: Reduce[_] =>
+        reduceOp(reduce.f, conf)
+      case take: Take[_] =>
+        takeOp(take.count, conf)
+      case takeWhile: TakeWhile[_] =>
+        filterOp(takeWhile.p, conf)
+      case takeWithin: TakeWithin[_] =>
+        val takeWithinConf =
+          conf.withValue[FiniteDuration](TakeWithinTask.TIMEOUT, 
takeWithin.timeout)
+        ProcessorOp(classOf[TakeWithinTask[_]],
+          parallelism, takeWithinConf, "takeWithin")
+      case throttle: Throttle[_] =>
+        val throttleConf = conf.withInt(ThrottleTask.COST, throttle.cost).
+          withInt(ThrottleTask.MAX_BURST, throttle.maximumBurst).
+          withValue[_ => Int](ThrottleTask.COST_CALC, 
throttle.costCalculation).
+          withValue[FiniteDuration](ThrottleTask.TIME_PERIOD, throttle.per)
+        ProcessorOp(classOf[ThrottleTask[_]],
+          parallelism, throttleConf, "throttle")
+    }
+  }
+
+  private def translateSymbolic(stage: 
PushPullGraphStageWithMaterializedValue[_, _, _, _],
+      conf: UserConfig): Op = {
+    stage match {
+      case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _] =>
+        symbolicGraphStage.symbolicStage match {
+          case buffer: Stages.Buffer[_] =>
+            // ignore the buffering operation
+            identity("buffer", conf)
+        }
+    }
+  }
+
+}
+
+object RemoteMaterializerImpl {
+  final val NotApplied: Any => Any = _ => NotApplied
+
+  def collectOp[In, Out](collect: PartialFunction[In, Out], conf: UserConfig): 
Op = {
+    flatMapOp({ data: In =>
+      collect.applyOrElse(data, NotApplied) match {
+        case NotApplied => None
+        case result: Any => Option(result)
+      }
+    }, "collect", conf)
+  }
+
+  def filterOp[In](filter: In => Boolean, conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      if (filter(data)) Option(data) else None
+    }, "filter", conf)
+  }
+
+  def headOptionOp[T](headOptionStage: HeadOptionStage[T], conf: UserConfig): 
Op = {
+    val promise: Promise[Option[T]] = Promise()
+    flatMapOp({ data: T =>
+      data match {
+        case None =>
+          Some(promise.future.failed)
+        case Some(d) =>
+          promise.future.value
+      }
+    }, "headOption", conf)
+  }
+
+  def reduceOp[T](reduce: (T, T) => T, conf: UserConfig): Op = {
+    var result: Option[T] = None
+    val flatMap = { elem: T =>
+      result match {
+        case None =>
+          result = Some(elem)
+        case Some(r) =>
+          result = Some(reduce(r, elem))
+      }
+      List(result)
+    }
+    flatMapOp(flatMap, "reduce", conf)
+  }
+
+  def zipWithOp[In1, In2](zipWith: (In1, In2) => (In1, In2), conf: 
UserConfig): Op = {
+    val flatMap = { elem: (In1, In2) =>
+      val (e1, e2) = elem
+      val result: (In1, In2) = zipWith(e1, e2)
+      List(result)
+    }
+    flatMapOp(flatMap, "zipWith", conf)
+  }
+
+  def zipWithOp2[In1, In2, Out](zipWith: (In1, In2) => Out, conf: UserConfig): 
Op = {
+    val flatMap = { elem: (In1, In2) =>
+      val (e1, e2) = elem
+      val result: Out = zipWith(e1, e2)
+      List(result)
+    }
+    flatMapOp(flatMap, "zipWith", conf)
+  }
+
+  def identity(description: String, conf: UserConfig): Op = {
+    flatMapOp({ data: Any =>
+      List(data)
+    }, description, conf)
+  }
+
+  def mapOp[In, Out](map: In => Out, conf: UserConfig): Op = {
+    val flatMap = (data: In) => List(map(data))
+    flatMapOp (flatMap, conf)
+  }
+
+  def flatMapOp[In, Out](flatMap: In => Iterable[Out], conf: UserConfig): Op = 
{
+    flatMapOp(flatMap, "flatmap", conf)
+  }
+
+  def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
+      conf: UserConfig): Op = {
+    FlatMapOp(fun, description, conf)
+  }
+
+  def conflatOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
+    conf: UserConfig): Op = {
+    var agg = None: Option[Out]
+    val flatMap = {elem: In =>
+      agg = agg match {
+        case None =>
+          Some(seed(elem))
+        case Some(value) =>
+          Some(aggregate(value, elem))
+      }
+      List(agg.get)
+    }
+    flatMapOp (flatMap, "conflat", conf)
+  }
+
+  def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op 
= {
+    var aggregator: Out = zero
+    val map = { elem: In =>
+      aggregator = fold(aggregator, elem)
+      List(aggregator)
+    }
+    flatMapOp(map, "fold", conf)
+  }
+
+  def groupedOp(count: Int, conf: UserConfig): Op = {
+    var left = count
+    val buf = {
+      val b = Vector.newBuilder[Any]
+      b.sizeHint(count)
+      b
+    }
+
+    val flatMap: Any => Iterable[Any] = {input: Any =>
+      buf += input
+      left -= 1
+      if (left == 0) {
+        val emit = buf.result()
+        buf.clear()
+        left = count
+        Some(emit)
+      } else {
+        None
+      }
+    }
+    flatMapOp(flatMap, conf: UserConfig)
+  }
+
+  def dropOp[T](number: Long, conf: UserConfig): Op = {
+    var left = number
+    val flatMap: T => Iterable[T] = {input: T =>
+      if (left > 0) {
+        left -= 1
+        None
+      } else {
+        Some(input)
+      }
+    }
+    flatMapOp(flatMap, "drop", conf)
+  }
+
+  def dropWhileOp[In](drop: In => Boolean, conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      if (drop(data))  None else Option(data)
+    }, "dropWhile", conf)
+  }
+
+  def logOp[T](name: String, extract: T => Any, conf: UserConfig): Op = {
+    val flatMap = {elem: T =>
+      LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
+      List(elem)
+    }
+    flatMapOp(flatMap, "log", conf)
+  }
+
+  def scanOp[In, Out](zero: Out, f: (Out, In) => Out, conf: UserConfig): Op = {
+    var aggregator = zero
+    var pushedZero = false
+
+    val flatMap = {elem: In =>
+      aggregator = f(aggregator, elem)
+
+      if (pushedZero) {
+        pushedZero = true
+        List(zero, aggregator)
+      } else {
+        List(aggregator)
+      }
+    }
+    flatMapOp(flatMap, "scan", conf)
+  }
+
+  def statefulMapOp[In, Out](f: In => Iterable[Out], conf: UserConfig): Op = {
+    flatMapOp ({ data: In =>
+      f(data)
+    }, conf)
+  }
+
+  def takeOp(count: Long, conf: UserConfig): Op = {
+    var left: Long = count
+
+    val filter: Any => Iterable[Any] = {elem: Any =>
+      left -= 1
+      if (left > 0) Some(elem)
+      else if (left == 0) Some(elem)
+      else None
+    }
+    flatMapOp(filter, "take", conf)
+  }
+
+  /**
+   * We use this attribute to track module to Processor
+   *
+   */
+  val TRACKABLE = "track how module is fused to processor"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
new file mode 100644
index 0000000..35d0e88
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ *
+ *   [[IN]] -> [[BridgeModule]] -> [[OUT]]
+ *                   /
+ *                  /
+ *       out of band data input or output channel [[MAT]]
+ *
+ *
+ * [[BridgeModule]] is used as a bridge between different materializers.
+ * Different [[akka.stream.Materializer]]s can use out of band channel to
+ * exchange messages.
+ *
+ * For example:
+ *
+ *                              Remote Materializer
+ *                         -----------------------------
+ *                         |                            |
+ *                         | BridgeModule -> RemoteSink |
+ *                         |  /                         |
+ *                         --/----------------------------
+ *   Local Materializer     /  out of band channel.
+ *   ----------------------/----
+ *   | Local              /    |
+ *   | Source ->  BridgeModule |
+ *   |                         |
+ *   ---------------------------
+ *
+ *
+ * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
+ * module during materialization.
+ *
+ * However, user can still declare it explicitly. In this case, it means we 
have a
+ * boundary Source or Sink module which accept out of band channel inputs or
+ * outputs.
+ *
+ * @tparam IN input
+ * @tparam OUT output
+ * @tparam MAT materializer
+ */
+abstract class BridgeModule[IN, OUT, MAT] extends AtomicModule {
+  val inPort = Inlet[IN]("BridgeModule.in")
+  val outPort = Outlet[OUT]("BridgeModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+
+  def attributes: Attributes
+  def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
+
+  protected def newInstance: BridgeModule[IN, OUT, MAT]
+  override def carbonCopy: Module = newInstance
+}
+
+
+/**
+ *
+ * Bridge module which accept out of band channel Input
+ * [[org.reactivestreams.Publisher]][IN].
+ *
+ *
+ *         [[SourceBridgeModule]] -> [[OUT]]
+ *                   /|
+ *                  /
+ *       out of band data input [[org.reactivestreams.Publisher]][IN]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from out of band 
[[org.reactivestreams.Publisher]]
+ * @tparam OUT out put data type to next module.
+ */
+class SourceBridgeModule[IN, OUT](val attributes: Attributes =
+    Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, 
Subscriber[IN]] {
+  override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] =
+    new SourceBridgeModule[IN, OUT](attributes)
+
+  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Subscriber[IN]] = {
+    new SourceBridgeModule( attributes)
+  }
+}
+
+/**
+ *
+ * Bridge module which accept out of band channel Output
+ * [[org.reactivestreams.Subscriber]][OUT].
+ *
+ *
+ *   [[IN]] -> [[BridgeModule]]
+ *                    \
+ *                     \
+ *                      \|
+ *       out of band data output [[org.reactivestreams.Subscriber]][OUT]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from previous module
+ * @tparam OUT out put data type to out of band subscriber
+ */
+class SinkBridgeModule[IN, OUT](val attributes: Attributes =
+    Attributes.name("sinkBridgeModule")) extends BridgeModule[IN, OUT, 
Publisher[OUT]] {
+  override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] =
+    new SinkBridgeModule[IN, OUT](attributes)
+
+  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Publisher[OUT]] = {
+    new SinkBridgeModule[IN, OUT](attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
new file mode 100644
index 0000000..2c430d5
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import akka.stream.impl.{SinkModule, SourceModule}
+import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ * [[DummyModule]] is a set of special module to help construct a 
RunnableGraph,
+ * so that all ports are closed.
+ *
+ * In runtime, [[DummyModule]] should be ignored during materialization.
+ *
+ * For example, if you have a [[BridgeModule]] which only accept the input
+ * message from out of band channel, then you can use DummySource to fake
+ * a Message Source Like this.
+ *
+ * [[DummySource]] -> [[BridgeModule]] -> Sink
+ *                      /|
+ *                     /
+ *       out of band input message [[Publisher]]
+ *
+ *  After materialization, [[DummySource]] will be removed.
+
+ *              [[BridgeModule]] -> Sink
+ *                      /|
+ *                     /
+ *           [[akka.stream.impl.PublisherSource]]
+ *
+ *
+ */
+trait DummyModule extends AtomicModule
+
+
+/**
+ *
+ *    [[DummySource]]-> [[BridgeModule]] -> Sink
+ *                        /|
+ *                       /
+ *       out of band input message Source
+ *
+ * @param attributes Attributes
+ * @param shape SourceShape[Out]
+ * @tparam Out Output
+ */
+class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
+  extends SourceModule[Out, Unit](shape) with DummyModule {
+
+  override def create(context: MaterializationContext): (Publisher[Out], Unit) 
= {
+    throw new UnsupportedOperationException()
+  }
+
+  override protected def newInstance(shape: SourceShape[Out]): 
SourceModule[Out, Unit] = {
+    new DummySource[Out](attributes, shape)
+  }
+
+  override def withAttributes(attr: Attributes): Module = {
+    new DummySource(attr, amendShape(attr))
+  }
+}
+
+
+/**
+ *
+ *    Source-> [[BridgeModule]] -> [[DummySink]]
+ *                    \
+ *                     \
+ *                      \|
+ *                   out of band output message [[Subscriber]]
+ *
+ * @param attributes Attributes
+ * @param shape SinkShape[IN]
+ */
+class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
+  extends SinkModule[IN, Unit](shape) with DummyModule {
+  override def create(context: MaterializationContext): (Subscriber[IN], Unit) 
= {
+    throw new UnsupportedOperationException()
+  }
+
+  override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, 
Unit] = {
+    new DummySink[IN](attributes, shape)
+  }
+
+  override def withAttributes(attr: Attributes): Module = {
+    new DummySink[IN](attr, amendShape(attr))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
new file mode 100644
index 0000000..7555244
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import akka.stream._
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * [[GearpumpTaskModule]] represent modules that can be materialized as 
Gearpump Tasks.
+ *
+ * This is specially designed for Gearpump runtime. It is not supposed to be 
used
+ * for local materializer.
+ * 
+ */
+trait GearpumpTaskModule extends AtomicModule
+
+/**
+ * This is used to represent the Gearpump Data Source
+ * @param source DataSource
+ * @param conf UserConfig
+ * @param shape SourceShape[T}
+ * @param attributes Attributes
+ * @tparam T type
+ */
+final case class SourceTaskModule[T](
+    source: DataSource,
+    conf: UserConfig,
+    shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
+    attributes: Attributes = Attributes.name("SourceTaskModule"))
+  extends GearpumpTaskModule {
+
+  override def withAttributes(attr: Attributes): Module =
+    this.copy(shape = amendShape(attr), attributes = attr)
+  override def carbonCopy: Module =
+    this.copy(shape = SourceShape( Outlet[T]("SourceTaskModule.out")))
+
+  override def replaceShape(s: Shape): Module =
+    if (s == shape) this
+    else throw new UnsupportedOperationException("cannot replace the shape of 
SourceTaskModule")
+
+  private def amendShape(attr: Attributes): SourceShape[T] = {
+    val thisN = attributes.nameOrDefault(null)
+    val thatN = attr.nameOrDefault(null)
+
+    if ((thatN eq null) || thisN == thatN) shape
+    else shape.copy(out = Outlet(thatN + ".out"))
+  }
+}
+
+/**
+ * This is used to represent the Gearpump Data Sink
+ * @param sink DataSink
+ * @param conf UserConfig
+ * @param shape SinkShape[IN]
+ * @param attributes Attributes
+ * @tparam IN type
+ */
+final case class SinkTaskModule[IN](
+    sink: DataSink,
+    conf: UserConfig,
+    shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")),
+    attributes: Attributes = Attributes.name("SinkTaskModule"))
+  extends GearpumpTaskModule {
+
+  override def withAttributes(attr: Attributes): Module =
+    this.copy(shape = amendShape(attr), attributes = attr)
+  override def carbonCopy: Module =
+    this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
+
+  override def replaceShape(s: Shape): Module =
+    if (s == shape) this
+    else throw new UnsupportedOperationException("cannot replace the shape of 
SinkTaskModule")
+
+  private def amendShape(attr: Attributes): SinkShape[IN] = {
+    val thisN = attributes.nameOrDefault(null)
+    val thatN = attr.nameOrDefault(null)
+
+    if ((thatN eq null) || thisN == thatN) shape
+    else shape.copy(in = Inlet(thatN + ".out"))
+  }
+}
+
+/**
+ * This is to represent the Gearpump Processor which has exact one input and 
one output
+ * @param processor Class[_ <: Task]
+ * @param conf UserConfig
+ * @param attributes Attributes
+ * @tparam IN type
+ * @tparam OUT type
+ * @tparam Unit void
+ */
+case class ProcessorModule[IN, OUT, Unit](
+    processor: Class[_ <: Task],
+    conf: UserConfig,
+    attributes: Attributes = Attributes.name("processorModule"))
+  extends AtomicModule with GearpumpTaskModule {
+  val inPort = Inlet[IN]("ProcessorModule.in")
+  val outPort = Outlet[IN]("ProcessorModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+  
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: ProcessorModule[IN, OUT, Unit] =
+    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
+
+  override def withAttributes(attributes: Attributes): ProcessorModule[IN, 
OUT, Unit] = {
+    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
new file mode 100644
index 0000000..4465886
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
+
+/**
+ *
+ * Group the T value groupBy function
+ *
+ * @param groupBy T => Group
+ * @param attributes Attributes
+ * @tparam T type
+ * @tparam Group type
+ */
+case class GroupByModule[T, Group](groupBy: T => Group,
+    attributes: Attributes = Attributes.name("groupByModule"))
+  extends AtomicModule {
+  val inPort = Inlet[T]("GroupByModule.in")
+  val outPort = Outlet[T]("GroupByModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: GroupByModule[T, Group] =
+    new GroupByModule[T, Group](groupBy, attributes)
+
+  override def withAttributes(attributes: Attributes): GroupByModule[T, Group] 
= {
+    new GroupByModule[T, Group](groupBy, attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
new file mode 100644
index 0000000..295556f
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
+
+/**
+ *
+ * Reduce Module
+ *
+ * @param f (T,T) => T
+ * @param attributes Attributes
+ * @tparam T type
+ */
+case class ReduceModule[T](f: (T, T) => T, attributes: Attributes =
+Attributes.name("reduceModule")) extends AtomicModule {
+  val inPort = Inlet[T]("GroupByModule.in")
+  val outPort = Outlet[T]("GroupByModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, 
attributes)
+
+  override def withAttributes(attributes: Attributes): ReduceModule[T] = {
+    new ReduceModule[T](f, attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
new file mode 100644
index 0000000..85b1d5e
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.scaladsl
+
+import akka.stream.Attributes
+import org.apache.gearpump.akkastream.module._
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+import org.reactivestreams.{Publisher, Subscriber}
+
+
+object GearSource{
+
+  /**
+   * Construct a Source which accepts out of band input messages.
+   *
+   *                   [[SourceBridgeModule]] -> Sink
+   *                          /
+   *                         /
+   *                        V
+   *                materialize to [[Subscriber]]
+   *                                   /|
+   *                                  /
+   *       upstream [[Publisher]] send out of band message
+   *
+   */
+  def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = {
+    val source = new Source(new DummySource[IN](Attributes.name("dummy"), 
Source.shape("dummy")))
+    val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, 
OUT]())
+    source.viaMat(flow)(Keep.right)
+  }
+
+  /**
+   * Construct a Source from Gearpump [[DataSource]].
+   *
+   *    [[SourceTaskModule]] -> downstream Sink
+   *
+   */
+  def from[OUT](source: DataSource): Source[OUT, Unit] = {
+    val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, 
UserConfig.empty))
+    taskSource
+  }
+
+  /**
+   * Construct a Source from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
+   *
+   *    [[ProcessorModule]] -> downstream Sink
+   *
+   */
+  def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, 
Unit] = {
+    val source = new Source(new DummySource[Unit](Attributes.name("dummy"), 
Source.shape("dummy")))
+    val flow = Processor.apply[Unit, OUT](processor, conf)
+    source.viaMat(flow)(Keep.right)
+  }
+}
+
+object GearSink {
+
+  /**
+   * Construct a Sink which output messages to a out of band channel.
+   *
+   *   Souce ->   [[SinkBridgeModule]]
+   *                    \
+   *                     \|
+   *         materialize to [[Publisher]]
+   *                              \
+   *                               \
+   *                                \|
+   *       send out of band message to downstream [[Subscriber]]
+   *
+   */
+  def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = {
+    val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), 
Sink.shape("dummy")))
+    val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, 
OUT]())
+    flow.to(sink)
+  }
+
+  /**
+   * Construct a Sink from Gearpump [[DataSink]].
+   *
+   *    Upstream Source -> [[SinkTaskModule]]
+   *
+   */
+  def to[IN](sink: DataSink): Sink[IN, Unit] = {
+    val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, 
UserConfig.empty))
+    taskSink
+  }
+
+  /**
+   * Construct a Sink from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
+   *
+   *    Upstream Source -> [[ProcessorModule]]
+   *
+   */
+  def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
+    val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), 
Sink.shape("dummy")))
+    val flow = Processor.apply[IN, Unit](processor, conf)
+    flow.to(sink)
+  }
+}
+
+/**
+ *
+ * GroupBy will divide the main input stream to a set of sub-streams.
+ * This is a work-around to bypass the limitation of official API Flow.groupBy
+ *
+ *
+ * For example, to do a word count, we can write code like this:
+ *
+ * case class KV(key: String, value: String)
+ * case class Count(key: String, count: Int)
+ *
+ * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
+ *   Count(kv.key, 1)
+ * }.fold(Count(null, 0)) {(base, add) =>
+ *   Count(add.key, base.count + add.count)
+ * }.log("count of current key")
+ * .flatten()
+ * .to(sink)
+ *
+ * map, fold will transform data on all sub-streams, If there are 10 groups,
+ * then there will be 10 sub-streams, and for each sub-stream, there will be
+ * a map and fold.
+ *
+ * flatten will collect all sub-stream into the main stream,
+ *
+ * sink will only operate on the main stream.
+ *
+ */
+object GroupBy{
+  def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
+    new Flow[T, T, Unit](new GroupByModule(groupBy))
+  }
+}
+
+/**
+ * Aggregate on the data.
+ *
+ * val flow = Reduce({(a: Int, b: Int) => a + b})
+ *
+ *
+ */
+object Reduce{
+  def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
+    new Flow[T, T, Unit](new ReduceModule(reduce))
+  }
+}
+
+
+/**
+ * Create a Flow by providing a Gearpump Processor class and configuration
+ *
+ *
+ */
+object Processor{
+  def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, 
Out, Unit] = {
+    new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, 
conf))
+  }
+}
+
+object Implicits {
+
+  /**
+   * Help util to support reduce and groupBy
+   */
+  implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
+
+    // TODO It is named as groupBy2 to avoid conflict with built-in
+    // groupBy. Eventually, we think the built-in groupBy should
+    // be replace with this implementation.
+    def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
+      val stage = GroupBy.apply(groupBy)
+      source.via[T, Unit](stage)
+    }
+
+
+    def reduce(reduce: (T, T) => T): Source[T, Mat] = {
+      val stage = Reduce.apply(reduce)
+      source.via[T, Unit](stage)
+    }
+
+    def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, 
Mat] = {
+      val stage = Processor.apply[T, R](processor, conf)
+      source.via(stage)
+    }
+  }
+
+  /**
+   * Help util to support reduce and groupBy
+   */
+  implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) {
+    def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = {
+      val stage = GroupBy.apply(groupBy)
+      flow.via(stage)
+    }
+
+    def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = {
+      val stage = Reduce.apply(reduce)
+      flow.via(stage)
+    }
+
+    def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, 
Mat] = {
+      val stage = Processor.apply[OUT, R](processor, conf)
+      flow.via(stage)
+    }
+  }
+
+  /**
+   * Help util to support groupByKey and sum
+   */
+  implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) {
+
+    /**
+     * if it is a KV Pair, we can group the KV pair by the key.
+     * @return
+     */
+    def groupByKey: Source[(K, V), Mat] = {
+      val stage = GroupBy.apply(getTupleKey[K, V])
+      source.via(stage)
+    }
+
+    /**
+     * do sum on values
+     *
+     * Before doing this, you need to do groupByKey to group same key together
+     * , otherwise, it will do the sum no matter what current key is.
+     *
+     * @param numeric Numeric[V]
+     * @return
+     */
+    def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
+      val stage = Reduce.apply(sumByKey[K, V](numeric))
+      source.via(stage)
+    }
+  }
+
+  /**
+   * Help util to support groupByKey and sum
+   */
+  implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
+
+    /**
+     * if it is a KV Pair, we can group the KV pair by the key.
+     * @return
+     */
+    def groupByKey: Flow[(K, V), (K, V), Mat] = {
+      val stage = GroupBy.apply(getTupleKey[K, V])
+      flow.via(stage)
+    }
+
+    /**
+     * do sum on values
+     *
+     * Before doing this, you need to do groupByKey to group same key together
+     * , otherwise, it will do the sum no matter what current key is.
+     *
+     * @param numeric Numeric[V]
+     * @return
+     */
+    def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
+      val stage = Reduce.apply(sumByKey[K, V](numeric))
+      flow.via(stage)
+    }
+  }
+
+  private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+  private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, 
V]) => Tuple2[K, V] =
+    (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
new file mode 100644
index 0000000..5139117
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class BalanceTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfOutputs = sizeOfOutPorts
+  var index = 0
+
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfOutputs) {
+      index = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
new file mode 100644
index 0000000..582327b
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val max = userConf.getLong(BatchTask.MAX)
+  val costFunc = userConf.getValue[In => Long](BatchTask.COST)
+  val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE)
+  val seed = userConf.getValue[In => Out](BatchTask.SEED)
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    context.output(msg)
+  }
+}
+
+object BatchTask {
+  val AGGREGATE = "AGGREGATE"
+  val COST = "COST"
+  val MAX = "MAX"
+  val SEED = "SEED"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
new file mode 100644
index 0000000..9f1194f
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class BroadcastTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
new file mode 100644
index 0000000..241fa76
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class ConcatTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfOutputs = sizeOfOutPorts
+  var index = 0
+
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfOutputs) {
+      index = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
new file mode 100644
index 0000000..d6c347a
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+case object DelayInitialTime
+
+class DelayInitialTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val delayInitial = 
userConf.getValue[FiniteDuration](DelayInitialTask.DELAY_INITIAL).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var delayInitialActive = true
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(delayInitial)(
+      self ! Message(DelayInitialTime, System.currentTimeMillis())
+    )
+  }
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DelayInitialTime =>
+        delayInitialActive = false
+      case _ =>
+        delayInitialActive match {
+          case true =>
+          case false =>
+            context.output(msg)
+        }
+    }
+  }
+}
+
+object DelayInitialTask {
+  val DELAY_INITIAL = "DELAY_INITIAL"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
new file mode 100644
index 0000000..9da26b1
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object DropWithinTimeout
+
+class DropWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val timeout = userConf.getValue[FiniteDuration](DropWithinTask.TIMEOUT).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var timeoutActive = true
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(timeout)(
+      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DropWithinTimeout =>
+        timeoutActive = false
+      case _ =>
+
+    }
+    timeoutActive match {
+      case true =>
+      case false =>
+        context.output(msg)
+    }
+  }
+}
+
+object DropWithinTask {
+  val TIMEOUT = "TIMEOUT"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
new file mode 100644
index 0000000..512164d
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class FlattenMergeTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfOutputs = sizeOfOutPorts
+  var index = 0
+
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfOutputs) {
+      index = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
new file mode 100644
index 0000000..e2f02d8
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class FoldTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val zero = userConf.getValue[Out](FoldTask.ZERO)
+  val aggregator = userConf.getValue[(Out, In) => Out](FoldTask.AGGREGATOR)
+  var aggregated: Out = _
+  implicit val ec = context.system.dispatcher
+
+  override def onStart(instant: Instant): Unit = {
+    zero.foreach(value => {
+      aggregated = value
+    })
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    aggregator.foreach(func => {
+      aggregated = func(aggregated, data)
+      LOG.info(s"aggregated = $aggregated")
+      val msg = new Message(aggregated, time)
+      context.output(msg)
+    })
+  }
+}
+
+object FoldTask {
+  val ZERO = "ZERO"
+  val AGGREGATOR = "AGGREGATOR"
+}


Reply via email to