http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
index c03fce2..fe86951 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 8fbe785..99ebe17 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -79,7 +79,7 @@ object RemoteGraph {
       val materializer = new RemoteMaterializerImpl(graph, system)
       val (app, matValues) = materializer.materialize
 
-      val appId = context.submit(app)
+      val appId = context.submit(app).appId
       // scalastyle:off println
       println("sleep 5 second until the application is ready on cluster")
       // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
index a0395de..a74143e 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
index cbafcf5..477f4d3 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,18 +21,16 @@ package org.apache.gearpump.akkastream.materializer
 import java.util.concurrent.atomic.AtomicBoolean
 import java.{util => ju}
 
-import _root_.org.apache.gearpump.util.{Graph => GGraph}
-import akka.NotUsed
+import org.apache.gearpump.util.{Graph => GGraph}
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill}
 import akka.dispatch.Dispatchers
 import akka.event.{Logging, LoggingAdapter}
 import akka.stream.impl.StreamLayout._
 import akka.stream.impl._
 import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
+import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, 
GraphInterpreterShell, GraphModule, GraphStageModule}
 import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
-import akka.stream.impl.fusing.{Map => _, _}
-import akka.stream.impl.io.{TLSActor, TlsModule}
-import akka.stream.scaladsl.{GraphDSL, Keep, ModuleExtractor, RunnableGraph}
+import akka.stream.scaladsl.ModuleExtractor
 import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
 import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
 import org.apache.gearpump.akkastream.module.ReduceModule
@@ -121,25 +119,27 @@ case class LocalMaterializerImpl (
           assignPort(stage.inPort, processor)
           assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
           matVal.put(atomic, mat)
-        case tls: TlsModule => // TODO solve this so TlsModule doesn't need 
special treatment here
-          val es = effectiveSettings(effectiveAttributes)
-          val props =
-            TLSActor.props(es, tls.sslContext, tls.sslConfig,
-              tls.firstSession, tls.role, tls.closing, tls.hostInfo)
-          val impl = actorOf(props, stageName(effectiveAttributes), 
es.dispatcher)
-          def factory(id: Int) = new ActorPublisher[Any](impl) {
-            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-          }
-          val publishers = Vector.tabulate(2)(factory)
-          impl ! FanOut.ExposedPublishers(publishers)
-
-          assignPort(tls.plainOut, publishers(TLSActor.UserOut))
-          assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
-
-          assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
-          assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, 
TLSActor.TransportIn))
-
-          matVal.put(atomic, NotUsed)
+        // FIXME
+        //        case tls: TlsModule =>
+        // TODO solve this so TlsModule doesn't need special treatment here
+        //          val es = effectiveSettings(effectiveAttributes)
+        //          val props =
+        //            TLSActor.props(es, tls.sslContext, tls.sslConfig,
+        //              tls.firstSession, tls.role, tls.closing, tls.hostInfo)
+        //          val impl = actorOf(props, stageName(effectiveAttributes), 
es.dispatcher)
+        //          def factory(id: Int) = new ActorPublisher[Any](impl) {
+        //            override val wakeUpMsg = 
FanOut.SubstreamSubscribePending(id)
+        //          }
+        //          val publishers = Vector.tabulate(2)(factory)
+        //          impl ! FanOut.ExposedPublishers(publishers)
+        //
+        //          assignPort(tls.plainOut, publishers(TLSActor.UserOut))
+        //          assignPort(tls.cipherOut, 
publishers(TLSActor.TransportOut))
+        //
+        //          assignPort(tls.plainIn, FanIn.SubInput[Any](impl, 
TLSActor.UserIn))
+        //          assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, 
TLSActor.TransportIn))
+        //
+        //          matVal.put(atomic, NotUsed)
         case graph: GraphModule =>
           matGraph(graph, effectiveAttributes, matVal)
         case stage: GraphStageModule =>
@@ -187,6 +187,11 @@ case class LocalMaterializerImpl (
   }
 
   override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
       subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
 
     LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
@@ -194,6 +199,15 @@ case class LocalMaterializerImpl (
 
   }
 
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: 
Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+    logger
+  }
+
   def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
     var moduleInProgress: Module = EmptyModule
     graph.vertices.foreach(module => {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index 936ac29..e065c90 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,22 +20,24 @@ package org.apache.gearpump.akkastream.materializer
 
 import akka.actor.ActorSystem
 import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.Timers._
+import akka.stream.impl.Timers.{Completion, DelayInitial, Idle, IdleInject, 
IdleTimeoutBidi, Initial}
+import akka.stream.impl.fusing.{Batch, Collect, Delay, Drop, DropWhile, 
DropWithin, Filter, FlattenMerge, Fold, GraphStageModule, GroupBy, 
GroupedWithin, Intersperse, LimitWeighted, Log, MapAsync, MapAsyncUnordered, 
PrefixAndTail, Recover, Reduce, Scan, Split, StatefulMapConcat, SubSink, 
SubSource, Take, TakeWhile, TakeWithin, Map => FMap}
 import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, 
SimpleLinearGraphStage, SingleSource, TickSource}
-import akka.stream.impl.fusing.{Map => FMap, _}
 import akka.stream.impl.io.IncomingConnectionStage
-import akka.stream.impl.{HeadOptionStage, Stages, Throttle}
-import akka.stream.scaladsl._
+import akka.stream.impl.{HeadOptionStage, Stages, Throttle, Unfold, 
UnfoldAsync}
+import akka.stream.scaladsl.{Balance, Broadcast, Concat, Interleave, Merge, 
MergePreferred, MergeSorted, ModuleExtractor, Unzip, Zip, ZipWith2}
 import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
 import akka.stream.stage.GraphStage
 import org.apache.gearpump.akkastream.GearAttributes
 import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
-import org.apache.gearpump.akkastream.module._
-import org.apache.gearpump.akkastream.task._
+import org.apache.gearpump.akkastream.module.{GroupByModule, ProcessorModule, 
ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, 
SourceTaskModule}
+import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, 
BroadcastTask, ConcatTask, DelayInitialTask, DropWithinTask, FlattenMergeTask, 
FoldTask, GraphTask, GroupedWithinTask, InterleaveTask, MapAsyncTask, 
MergeTask, SingleSourceTask, SinkBridgeTask, SourceBridgeTask, 
StatefulMapConcatTask, TakeWithinTask, ThrottleTask, TickSourceTask, Zip2Task}
+import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, 
INTERVAL, TICK}
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
+import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, 
DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.CountWindow
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
@@ -162,7 +164,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
         case sinkBridge: SinkBridgeModule[_, _] =>
           ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
         case groupBy: GroupByModule[_, _] =>
-          GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf)
+          GroupByOp(GroupAlsoByWindow(groupBy.groupBy, 
CountWindow.apply(1).accumulating),
+            parallelism, "groupBy", conf)
         case reduce: ReduceModule[_] =>
           reduceOp(reduce.f, conf)
         case graphStage: GraphStageModule =>
@@ -192,7 +195,6 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
       parallelism: Int, conf: UserConfig): Op = {
     module.stage match {
       case tickSource: TickSource[_] =>
-        import TickSourceTask._
         val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef]
         val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, 
tickSource.initialDelay).
           withValue[FiniteDuration](INTERVAL, tickSource.interval).
@@ -322,9 +324,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
         // TODO
         null
       case unzip: Unzip[_, _] =>
-//        ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
-//          conf.withValue(
-//            Unzip2Task.UNZIP2_FUNCTION, 
Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
+        // ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
+        //   conf.withValue(
+        //     Unzip2Task.UNZIP2_FUNCTION, 
Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
         // TODO
         null
       case zip: Zip[_, _] =>
@@ -388,9 +390,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
   private def translateSymbolic(stage: 
PushPullGraphStageWithMaterializedValue[_, _, _, _],
       conf: UserConfig): Op = {
     stage match {
-      case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _] =>
-        symbolicGraphStage.symbolicStage match {
-          case buffer: Stages.Buffer[_] =>
+      case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _]
+        if symbolicGraphStage.symbolicStage.attributes.equals(
+          Stages.DefaultAttributes.buffer) => {
             // ignore the buffering operation
             identity("buffer", conf)
         }
@@ -478,7 +480,7 @@ object RemoteMaterializerImpl {
 
   def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
       conf: UserConfig): Op = {
-    ChainableOp(new FlatMapFunction[In, Out](fun, description), conf)
+    ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), 
conf)
   }
 
   def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
index 35d0e88..5b8c71b 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
index 2c430d5..ea76bb0 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
index 7555244..dfbbee9 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
index 4465886..b06dd0e 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
index 295556f..462d967 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
index 80619ef..8e43c16 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
index 5139117..43f07c4 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
index 582327b..5c2485b 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
index 9f1194f..292468d 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
index 241fa76..b77b9bd 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
index d6c347a..7c335dc 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
index 9da26b1..0c54829 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
index 512164d..14ff537 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
index e2f02d8..d982ebd 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
index 8e7a2df..3310ab9 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
index 29d9c91..eaf2b3f 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
index 837de6b..741ec43 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
index 387116d..daa1afc 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
index 2b1cd33..ad18f72 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -7,12 +7,12 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * SeG the License for the specific language governing permissions and
+ * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
index 1ff9ccd..458bb4e 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
index 05011e9..1b9c4e3 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
index b0eda19..054b483 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
index bf2c14f..a0674bc 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
index ef43fbe..9559d8f 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
index 4e09bf2..3c7ad87 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
index b3850ca..d99d2db 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 7dd91fc..005d018 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
index a35b133..7e0c082 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
index c9fe67d..6ad90df 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
 
b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
index 5b2a890..f52afc7 100644
--- 
a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
+++ 
b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
@@ -17,11 +17,12 @@
  */
 package org.apache.gearpump.cluster.utils;
 
-import org.apache.commons.io.IOUtils;
+import com.google.common.io.CharStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStreamReader;
 
 public class SystemOperation {
 
@@ -44,8 +45,8 @@ public class SystemOperation {
     Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", 
cmd}).start();
     try {
       process.waitFor();
-      String output = IOUtils.toString(process.getInputStream());
-      String errorOutput = IOUtils.toString(process.getErrorStream());
+      String output = CharStreams.toString(new 
InputStreamReader(process.getInputStream()));
+      String errorOutput = CharStreams.toString(new 
InputStreamReader(process.getErrorStream()));
       LOG.debug("Shell Output: " + output);
       if (errorOutput.length() != 0) {
         LOG.error("Shell Error Output: " + errorOutput);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git 
a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
 
b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
index 74a4047..bf291cf 100644
--- 
a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
+++ 
b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
@@ -43,7 +43,7 @@ class CGroupProcessLauncher(val config: Config) extends 
ExecutorProcessLauncher
   }
 
   override def createProcess(
-    appId: Int, executorId: Int, resource: Resource, appConfig: Config, 
options: Array[String],
+      appId: Int, executorId: Int, resource: Resource, appConfig: Config, 
options: Array[String],
     classPath: Array[String], mainClass: String, arguments: Array[String]): 
RichProcess = {
     val cgroupCommand = if (executorId != APP_MASTER) {
       cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git 
a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala 
b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
index 84dec70..ea738d6 100644
--- 
a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
+++ 
b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.redis
 import java.nio.charset.Charset
 
 object RedisMessage {
-
   private def toBytes(strings: List[String]): List[Array[Byte]] =
     strings.map(string => string.getBytes(Charset.forName("UTF8")))
 
@@ -48,11 +47,10 @@ object RedisMessage {
      * @param latitude
      * @param member
      */
-    case class GEOADD(key: Array[Byte], longitude: Double,
-                      latitude: Double, member: Array[Byte]) {
-      def this(key: String, longitude: Double,
-               latitude: Double, member: String) =
+    case class GEOADD(key: Array[Byte], longitude: Double, latitude: Double, 
member: Array[Byte]) {
+      def this(key: String, longitude: Double, latitude: Double, member: 
String) = {
         this(toBytes(key), longitude, latitude, toBytes(member))
+      }
     }
 
   }
@@ -66,7 +64,9 @@ object RedisMessage {
      * @param field
      */
     case class HDEL(key: Array[Byte], field: Array[Byte]) {
-      def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+      def this(key: String, field: String) = {
+        this(toBytes(key), toBytes(field))
+      }
     }
 
     /**
@@ -77,8 +77,9 @@ object RedisMessage {
      * @param increment
      */
     case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
-      def this(key: String, field: String, increment: Long) =
+      def this(key: String, field: String, increment: Long) = {
         this(toBytes(key), toBytes(field), increment)
+      }
     }
 
     /**
@@ -89,8 +90,9 @@ object RedisMessage {
      * @param increment
      */
     case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: 
Float) {
-      def this(key: String, field: String, increment: Float) =
+      def this(key: String, field: String, increment: Float) = {
         this(toBytes(key), toBytes(field), increment)
+      }
     }
 
 
@@ -102,8 +104,9 @@ object RedisMessage {
      * @param value
      */
     case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
-      def this(key: String, field: String, value: String) =
+      def this(key: String, field: String, value: String) = {
         this(toBytes(key), toBytes(field), toBytes(value))
+      }
     }
 
     /**
@@ -114,8 +117,9 @@ object RedisMessage {
      * @param value
      */
     case class HSETNX(key: Array[Byte], field: Array[Byte], value: 
Array[Byte]) {
-      def this(key: String, field: String, value: String) =
+      def this(key: String, field: String, value: String) = {
         this(toBytes(key), toBytes(field), toBytes(value))
+      }
     }
 
   }
@@ -142,8 +146,9 @@ object RedisMessage {
      * @param value
      */
     case class LPUSH(key: Array[Byte], value: Array[Byte]) {
-
-      def this(key: String, value: String) = this(key, toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -153,7 +158,9 @@ object RedisMessage {
      * @param value
      */
     case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -164,7 +171,9 @@ object RedisMessage {
      * @param value
      */
     case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
-      def this(key: String, index: Long, value: String) = this(toBytes(key), 
index, toBytes(value))
+      def this(key: String, index: Long, value: String) = {
+        this(toBytes(key), index, toBytes(value))
+      }
     }
 
     /**
@@ -174,8 +183,9 @@ object RedisMessage {
      * @param value
      */
     case class RPUSH(key: Array[Byte], value: Array[Byte]) {
-
-      def this(key: String, value: String) = this(key, toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -185,7 +195,9 @@ object RedisMessage {
      * @param value
      */
     case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
   }
@@ -198,8 +210,9 @@ object RedisMessage {
      * @param message
      */
     case class DEL(message: Array[Byte]) {
-
-      def this(message: String) = this(toBytes(message))
+      def this(message: String) = {
+        this(toBytes(message))
+      }
     }
 
     /**
@@ -208,7 +221,9 @@ object RedisMessage {
      * @param key
      */
     case class EXPIRE(key: Array[Byte], seconds: Int) {
-      def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+      def this(key: String, seconds: Int) = {
+        this(toBytes(key), seconds)
+      }
     }
 
     /**
@@ -218,7 +233,9 @@ object RedisMessage {
      * @param timestamp
      */
     case class EXPIREAT(key: Array[Byte], timestamp: Long) {
-      def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+      def this(key: String, timestamp: Long) = {
+        this(toBytes(key), timestamp)
+      }
     }
 
     /**
@@ -230,9 +247,11 @@ object RedisMessage {
      * @param database
      * @param timeout
      */
-    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], 
database: Int, timeout: Int) {
-      def this(host: String, port: Int, key: String, database: Int, timeout: 
Int) =
+    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte],
+        database: Int, timeout: Int) {
+      def this(host: String, port: Int, key: String, database: Int, timeout: 
Int) = {
         this(toBytes(host), port, toBytes(key), database, timeout)
+      }
     }
 
     /**
@@ -242,7 +261,9 @@ object RedisMessage {
      * @param db
      */
     case class MOVE(key: Array[Byte], db: Int) {
-      def this(key: String, db: Int) = this(toBytes(key), db)
+      def this(key: String, db: Int) = {
+        this(toBytes(key), db)
+      }
     }
 
     /**
@@ -251,7 +272,9 @@ object RedisMessage {
      * @param key
      */
     case class PERSIST(key: Array[Byte]) {
-      def this(key: String) = this(toBytes(key))
+      def this(key: String) = {
+        this(toBytes(key))
+      }
     }
 
     /**
@@ -261,7 +284,9 @@ object RedisMessage {
      * @param milliseconds
      */
     case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
-      def this(key: String, milliseconds: Long) = this(toBytes(key), 
milliseconds)
+      def this(key: String, milliseconds: Long) = {
+        this(toBytes(key), milliseconds)
+      }
     }
 
     /**
@@ -271,7 +296,9 @@ object RedisMessage {
      * @param timestamp
      */
     case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
-      def this(key: String, milliseconds: Long) = this(toBytes(key), 
milliseconds)
+      def this(key: String, milliseconds: Long) = {
+        this(toBytes(key), milliseconds)
+      }
     }
 
     /**
@@ -281,7 +308,9 @@ object RedisMessage {
      * @param newKey
      */
     case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
-      def this(key: String, newKey: String) = this(toBytes(key), 
toBytes(newKey))
+      def this(key: String, newKey: String) = {
+        this(toBytes(key), toBytes(newKey))
+      }
     }
 
     /**
@@ -291,7 +320,9 @@ object RedisMessage {
      * @param newKey
      */
     case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
-      def this(key: String, newKey: String) = this(toBytes(key), 
toBytes(newKey))
+      def this(key: String, newKey: String) = {
+        this(toBytes(key), toBytes(newKey))
+      }
     }
 
   }
@@ -306,8 +337,9 @@ object RedisMessage {
      * @param members
      */
     case class SADD(key: Array[Byte], members: Array[Byte]) {
-
-      def this(key: String, members: String) = this(key, toBytes(members))
+      def this(key: String, members: String) = {
+        this(toBytes(key), toBytes(members))
+      }
     }
 
 
@@ -319,8 +351,9 @@ object RedisMessage {
      * @param member
      */
     case class SMOVE(source: Array[Byte], destination: Array[Byte], member: 
Array[Byte]) {
-      def this(source: String, destination: String, member: String) =
+      def this(source: String, destination: String, member: String) = {
         this(toBytes(source), toBytes(destination), toBytes(member))
+      }
     }
 
 
@@ -331,8 +364,9 @@ object RedisMessage {
      * @param member
      */
     case class SREM(key: Array[Byte], member: Array[Byte]) {
-
-      def this(key: String, member: String) = this(key, toBytes(member))
+      def this(key: String, member: String) = {
+        this(toBytes(key), toBytes(member))
+      }
     }
 
   }
@@ -346,7 +380,9 @@ object RedisMessage {
      * @param value
      */
     case class APPEND(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -355,7 +391,9 @@ object RedisMessage {
      * @param key
      */
     case class DECR(key: Array[Byte]) {
-      def this(key: String) = this(toBytes(key))
+      def this(key: String) = {
+        this(toBytes(key))
+      }
     }
 
     /**
@@ -365,7 +403,9 @@ object RedisMessage {
      * @param decrement
      */
     case class DECRBY(key: Array[Byte], decrement: Int) {
-      def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+      def this(key: String, decrement: Int) = {
+        this(toBytes(key), decrement)
+      }
     }
 
     /**
@@ -374,7 +414,9 @@ object RedisMessage {
      * @param key
      */
     case class INCR(key: Array[Byte]) {
-      def this(key: String) = this(toBytes(key))
+      def this(key: String) = {
+        this(toBytes(key))
+      }
     }
 
     /**
@@ -384,7 +426,9 @@ object RedisMessage {
      * @param increment
      */
     case class INCRBY(key: Array[Byte], increment: Int) {
-      def this(key: String, increment: Int) = this(toBytes(key), increment)
+      def this(key: String, increment: Int) = {
+        this(toBytes(key), increment)
+      }
     }
 
     /**
@@ -394,7 +438,9 @@ object RedisMessage {
      * @param increment
      */
     case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
-      def this(key: String, increment: Number) = this(toBytes(key), increment)
+      def this(key: String, increment: Double) = {
+        this(toBytes(key), increment)
+      }
     }
 
 
@@ -405,7 +451,9 @@ object RedisMessage {
      * @param value
      */
     case class SET(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -416,7 +464,9 @@ object RedisMessage {
      * @param value
      */
     case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
-      def this(key: String, offset: Long, value: String) = this(toBytes(key), 
offset, toBytes(value))
+      def this(key: String, offset: Long, value: String) = {
+        this(toBytes(key), offset, toBytes(value))
+      }
     }
 
     /**
@@ -427,7 +477,9 @@ object RedisMessage {
      * @param value
      */
     case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
-      def this(key: String, seconds: Int, value: String) = this(toBytes(key), 
seconds, toBytes(value))
+      def this(key: String, seconds: Int, value: String) = {
+        this(toBytes(key), seconds, toBytes(value))
+      }
     }
 
     /**
@@ -437,7 +489,9 @@ object RedisMessage {
      * @param value
      */
     case class SETNX(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -448,9 +502,9 @@ object RedisMessage {
      * @param value
      */
     case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
-      def this(key: String, offset: Int, value: String) = this(toBytes(key), 
offset, toBytes(value))
+      def this(key: String, offset: Int, value: String) = {
+        this(toBytes(key), offset, toBytes(value))
+      }
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git 
a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala 
b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
index 3f75949..36a9fe3 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -32,20 +32,20 @@ import redis.clients.jedis.Jedis
 import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, 
DEFAULT_PORT, DEFAULT_TIMEOUT}
 
 /**
-  * Save message in Redis Instance
-  *
-  * @param host
-  * @param port
-  * @param timeout
-  * @param database
-  * @param password
-  */
+ * Save message in Redis Instance
+ *
+ * @param host
+ * @param port
+ * @param timeout
+ * @param database
+ * @param password
+ */
 class RedisSink(
-                    host: String = DEFAULT_HOST,
-                    port: Int = DEFAULT_PORT,
-                    timeout: Int = DEFAULT_TIMEOUT,
-                    database: Int = DEFAULT_DATABASE,
-                    password: String = "") extends DataSink {
+    host: String = DEFAULT_HOST,
+    port: Int = DEFAULT_PORT,
+    timeout: Int = DEFAULT_TIMEOUT,
+    database: Int = DEFAULT_DATABASE,
+    password: String = "") extends DataSink {
 
   private val LOG = LogUtil.getLogger(getClass)
   @transient private lazy val client = new Jedis(host, port, timeout)
@@ -59,7 +59,6 @@ class RedisSink(
   }
 
   override def write(message: Message): Unit = {
-
     message.msg match {
       // GEO
       case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, 
msg.member)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 544a4eb..df1de06 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -163,7 +163,7 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
       .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, 
stormConfig)
     val app = StreamApplication(name, processorGraph, config)
     LOG.info(s"jar file uploaded to $uploadedJarLocation")
-    val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
+    val appId = clientContext.submit(app, uploadedJarLocation, workerNum).appId
     applications += name -> appId
     topologies += name -> TopologyData(topology, stormConfig, 
uploadedJarLocation)
     LOG.info(s"Storm Application $appId submitted")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
index aaa0a99..4969314 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.partitioner
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
-import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, 
Partitioner}
 
 /**
  * Partitioner bound to a target Storm component

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
index 777acab..e3f1339 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.util
 
 import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
-import org.apache.gearpump.partitioner.Partitioner
+import org.apache.gearpump.streaming.partitioner.Partitioner
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
index 5513423..5fc631b 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -27,7 +27,7 @@ import org.scalatest.{Matchers, PropSpec}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
-import org.apache.gearpump.partitioner.Partitioner
+import org.apache.gearpump.streaming.partitioner.Partitioner
 
 class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
index 33c3e97..95c95c7 100644
--- 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
+++ 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
@@ -19,6 +19,7 @@
 package org.apache.gearpump.experiments.yarn
 
 object Constants {
+  val CONTAINER_USER = "gearpump.yarn.user"
   val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name"
   val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command"
   val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
index 711506a..fb482c4 100644
--- 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
+++ 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
@@ -48,7 +48,7 @@ abstract class AbstractCommand extends Command {
     : String = {
     val exe = config.getString(java)
 
-    s"$exe -cp ${classPath.mkString(":")}:" +
+    s"$exe -noverify -cp ${classPath.mkString(":")}:" +
       "$CLASSPATH " + properties.mkString(" ") +
       s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a 
${LOG_DIR_EXPANSION_VAR}/stderr"
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
index 2475728..267d588 100644
--- 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
+++ 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
@@ -26,6 +26,7 @@ import com.typesafe.config.{Config, ConfigValueFactory}
 import org.apache.gearpump.cluster.ClusterConfig
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
 import org.apache.gearpump.experiments.yarn.Constants
+import org.apache.gearpump.experiments.yarn.Constants._
 import org.apache.gearpump.experiments.yarn.appmaster.AppMasterCommand
 import 
org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, 
GetActiveConfig}
 import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, 
Resource}
@@ -184,6 +185,10 @@ object LaunchCluster extends AkkaApp with ArgumentsParser {
     if (parsed.getBoolean(VERBOSE)) {
       LogUtil.verboseLogToConsole()
     }
+    if (inputAkkaConf.hasPath(CONTAINER_USER)) {
+      val userName = inputAkkaConf.getString(CONTAINER_USER)
+      System.setProperty("HADOOP_USER_NAME", userName)
+    }
 
     val yarnConfig = new YarnConfig()
     val fs = new FileSystem(yarnConfig)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
 
b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
deleted file mode 100644
index c4c5a65..0000000
--- 
a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.experiments.yarn.appmaster
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.transport.HostPort
-
-class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  val config = ConfigFactory.parseString(
-
-    """
-      |
-      |gearpump {
-      |  yarn {
-      |    client {
-      |      package -path = "/user/gearpump/gearpump.zip"
-      |    }
-      |
-      |    applicationmaster {
-      |      ## Memory of YarnAppMaster
-      |        command = "$JAVA_HOME/bin/java -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |      queue = "default"
-      |    }
-      |
-      |    master {
-      |      ## Memory of master daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |    }
-      |
-      |    worker {
-      |      ## memory of worker daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      containers = "4"
-      |      ## This also contains all memory for child executors.
-      |      memory = "4096"
-      |      vcores = "1"
-      |    }
-      |    services {
-      |      enabled = true
-      |    }
-      |  }
-      |}
-    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
-  "MasterCommand" should "create correct command line" in {
-    val version = "gearpump-0.1"
-    val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080))
-
-    // scalastyle:off line.size.limit
-    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH
 -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 
-Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} 
-Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
org.apache.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    assert(master.get == expected)
-  }
-
-  "WorkerCommand" should "create correct command line" in {
-    val version = "gearpump-0.1"
-    val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), 
"worker-machine")
-    // scalastyle:off line.size.limit
-    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH
 -Dgearpump.cluster.masters.0=127.0.0.1:8080 
-Dgearpump.log.daemon.dir=<LOG_DIR> 
-Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} 
-Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine 
org.apache.gearpump.cluster.main.Worker  2>&1 | /usr/bin/tee -a 
<LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    assert(worker.get == expected)
-  }
-
-  "AppMasterCommand" should "create correct command line" in {
-    val version = "gearpump-0.1"
-    val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", 
"arg3"))
-    // scalastyle:off line.size.limit
-    val expected = "$JAVA_HOME/bin/java -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.binary-version-with-scala-version=gearpump-0.1 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}} 
org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster  arg1 arg2 arg3 
2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    assert(appmaster.get == expected)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
 
b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
index c0a4ee2..3f2f9cb 100644
--- 
a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
+++ 
b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
@@ -186,7 +186,7 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
     // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
     // scalastyle:off line.size.limit
-    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp 
conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 
-Dgearpump.binary-version-with-scala-version=gearpump-0.2 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}} 
org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf 
/root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
+    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -noverify -cp 
conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 
-Dgearpump.binary-version-with-scala-version=gearpump-0.2 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}} 
org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf 
/root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
     // scalastyle:on line.size.limit
     verify(yarnClient).submit("gearpump", appId, expectedCommand,
       Resource.newInstance(512, 1), "default",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index f85c43b..4b41ba1 100644
--- 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -31,22 +31,22 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.hadoop.security.UserGroupInformation
 
-class HBaseSink(userconfig: UserConfig, tableName: String,
+class HBaseSink(userConfig: UserConfig, tableName: String,
     val conn: (UserConfig, Configuration)
     => Connection, @transient var configuration: Configuration)
   extends DataSink {
 
-  lazy val connection = conn(userconfig, configuration)
+  lazy val connection = conn(userConfig, configuration)
   lazy val table = connection.getTable(TableName.valueOf(tableName))
 
   override def open(context: TaskContext): Unit = {}
 
-  def this(userconfig: UserConfig, tableName: String, configuration: 
Configuration) = {
-    this(userconfig, tableName, HBaseSink.getConnection, configuration)
+  def this(userConfig: UserConfig, tableName: String, configuration: 
Configuration) = {
+    this(userConfig, tableName, HBaseSink.getConnection, configuration)
   }
 
-  def this(userconfig: UserConfig, tableName: String) = {
-    this(userconfig, tableName, HBaseConfiguration.create())
+  def this(userConfig: UserConfig, tableName: String) = {
+    this(userConfig, tableName, HBaseConfiguration.create())
   }
 
   def insert(rowKey: String, columnGroup: String, columnName: String, value: 
String): Unit = {
@@ -120,14 +120,14 @@ object HBaseSink {
   val COLUMN_NAME = "hbase.table.column.name"
   val HBASE_USER = "hbase.user"
 
-  def apply[T](userconfig: UserConfig, tableName: String, configuration: 
Configuration)
+  def apply[T](userConfig: UserConfig, tableName: String, configuration: 
Configuration)
   : HBaseSink = {
-    new HBaseSink(userconfig, tableName, configuration)
+    new HBaseSink(userConfig, tableName, configuration)
   }
 
-  def apply[T](userconfig: UserConfig, tableName: String)
+  def apply[T](userConfig: UserConfig, tableName: String)
   : HBaseSink = {
-    new HBaseSink(userconfig, tableName)
+    new HBaseSink(userConfig, tableName)
   }
 
   private def getConnection(userConfig: UserConfig, configuration: 
Configuration): Connection = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 2417763..22efa89 100644
--- 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -18,13 +18,10 @@
 package org.apache.gearpump.external.hbase.dsl
 
 import scala.language.implicitConversions
-
 import org.apache.hadoop.conf.Configuration
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.external.hbase.HBaseSink
-import org.apache.gearpump.streaming.dsl.Stream
-import org.apache.gearpump.streaming.dsl.Stream.Sink
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
 
 /** Create a HBase DSL Sink */
 class HBaseDSLSink[T](stream: Stream[T]) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
index f1bb26a..996ae0b 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp}
 import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
@@ -44,7 +44,7 @@ object KafkaDSL {
       parallelism: Int = 1,
       config: UserConfig = UserConfig.empty,
       description: String = "KafkaSource"
-      ): dsl.Stream[T] = {
+      ): Stream[T] = {
     app.source[T](new KafkaSource(topics, properties), parallelism, config, 
description)
   }
 
@@ -66,19 +66,19 @@ object KafkaDSL {
       properties: Properties,
       parallelism: Int = 1,
       config: UserConfig = UserConfig.empty,
-      description: String = "KafkaSource"): dsl.Stream[T] = {
+      description: String = "KafkaSource"): Stream[T] = {
     val source = new KafkaSource(topics, properties)
     source.setCheckpointStore(checkpointStoreFactory)
     app.source[T](source, parallelism, config, description)
   }
 
   import scala.language.implicitConversions
-  implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = {
+  implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = {
     new KafkaDSL[T](stream)
   }
 }
 
-class KafkaDSL[T](stream: dsl.Stream[T]) {
+class KafkaDSL[T](stream: Stream[T]) {
 
   /**
    * Sinks data to Kafka
@@ -94,7 +94,7 @@ class KafkaDSL[T](stream: dsl.Stream[T]) {
       properties: Properties,
       parallelism: Int = 1,
       userConfig: UserConfig = UserConfig.empty,
-      description: String = "KafkaSink"): dsl.Stream[T] = {
+      description: String = "KafkaSink"): Stream[T] = {
     stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, 
description)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git 
a/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
 
b/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..2809247
--- /dev/null
+++ 
b/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file


Reply via email to