http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala index c03fce2..fe86951 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala index 8fbe785..99ebe17 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -79,7 +79,7 @@ object RemoteGraph { val materializer = new RemoteMaterializerImpl(graph, system) val (app, matValues) = materializer.materialize - val appId = context.submit(app) + val appId = context.submit(app).appId // scalastyle:off println println("sleep 5 second until the application is ready on cluster") // scalastyle:on println http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala index a0395de..a74143e 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala index cbafcf5..477f4d3 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,18 +21,16 @@ package org.apache.gearpump.akkastream.materializer import java.util.concurrent.atomic.AtomicBoolean import java.{util => ju} -import _root_.org.apache.gearpump.util.{Graph => GGraph} -import akka.NotUsed +import org.apache.gearpump.util.{Graph => GGraph} import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill} import akka.dispatch.Dispatchers import akka.event.{Logging, LoggingAdapter} import akka.stream.impl.StreamLayout._ import akka.stream.impl._ import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly +import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, GraphInterpreterShell, GraphModule, GraphStageModule} import akka.stream.impl.fusing.GraphStages.MaterializedValueSource -import akka.stream.impl.fusing.{Map => _, _} -import akka.stream.impl.io.{TLSActor, TlsModule} -import akka.stream.scaladsl.{GraphDSL, Keep, ModuleExtractor, RunnableGraph} +import akka.stream.scaladsl.ModuleExtractor import akka.stream.{ClosedShape, Graph => AkkaGraph, _} import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge import org.apache.gearpump.akkastream.module.ReduceModule @@ -121,25 +119,27 @@ case class LocalMaterializerImpl ( assignPort(stage.inPort, processor) assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]]) matVal.put(atomic, mat) - case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here - val es = effectiveSettings(effectiveAttributes) - val props = - TLSActor.props(es, tls.sslContext, tls.sslConfig, - tls.firstSession, tls.role, tls.closing, tls.hostInfo) - val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) - def factory(id: Int) = new ActorPublisher[Any](impl) { - override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) - } - val publishers = Vector.tabulate(2)(factory) - impl ! FanOut.ExposedPublishers(publishers) - - assignPort(tls.plainOut, publishers(TLSActor.UserOut)) - assignPort(tls.cipherOut, publishers(TLSActor.TransportOut)) - - assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn)) - assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn)) - - matVal.put(atomic, NotUsed) + // FIXME + // case tls: TlsModule => + // TODO solve this so TlsModule doesn't need special treatment here + // val es = effectiveSettings(effectiveAttributes) + // val props = + // TLSActor.props(es, tls.sslContext, tls.sslConfig, + // tls.firstSession, tls.role, tls.closing, tls.hostInfo) + // val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) + // def factory(id: Int) = new ActorPublisher[Any](impl) { + // override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) + // } + // val publishers = Vector.tabulate(2)(factory) + // impl ! FanOut.ExposedPublishers(publishers) + // + // assignPort(tls.plainOut, publishers(TLSActor.UserOut)) + // assignPort(tls.cipherOut, publishers(TLSActor.TransportOut)) + // + // assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn)) + // assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn)) + // + // matVal.put(atomic, NotUsed) case graph: GraphModule => matGraph(graph, effectiveAttributes, matVal) case stage: GraphStageModule => @@ -187,6 +187,11 @@ case class LocalMaterializerImpl ( } override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], + initialAttributes: Attributes): Mat = { + materialize(runnableGraph) + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], subflowFuser: GraphInterpreterShell => ActorRef): Mat = { LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get, @@ -194,6 +199,15 @@ case class LocalMaterializerImpl ( } + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], + subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = { + materialize(runnableGraph) + } + + override def makeLogger(logSource: Class[_]): LoggingAdapter = { + logger + } + def buildToplevelModule(graph: GGraph[Module, Edge]): Module = { var moduleInProgress: Module = EmptyModule graph.vertices.foreach(module => { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala index 936ac29..e065c90 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,22 +20,24 @@ package org.apache.gearpump.akkastream.materializer import akka.actor.ActorSystem import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.Timers._ +import akka.stream.impl.Timers.{Completion, DelayInitial, Idle, IdleInject, IdleTimeoutBidi, Initial} +import akka.stream.impl.fusing.{Batch, Collect, Delay, Drop, DropWhile, DropWithin, Filter, FlattenMerge, Fold, GraphStageModule, GroupBy, GroupedWithin, Intersperse, LimitWeighted, Log, MapAsync, MapAsyncUnordered, PrefixAndTail, Recover, Reduce, Scan, Split, StatefulMapConcat, SubSink, SubSource, Take, TakeWhile, TakeWithin, Map => FMap} import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource, TickSource} -import akka.stream.impl.fusing.{Map => FMap, _} import akka.stream.impl.io.IncomingConnectionStage -import akka.stream.impl.{HeadOptionStage, Stages, Throttle} -import akka.stream.scaladsl._ +import akka.stream.impl.{HeadOptionStage, Stages, Throttle, Unfold, UnfoldAsync} +import akka.stream.scaladsl.{Balance, Broadcast, Concat, Interleave, Merge, MergePreferred, MergeSorted, ModuleExtractor, Unzip, Zip, ZipWith2} import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue import akka.stream.stage.GraphStage import org.apache.gearpump.akkastream.GearAttributes import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge -import org.apache.gearpump.akkastream.module._ -import org.apache.gearpump.akkastream.task._ +import org.apache.gearpump.akkastream.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} +import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTask, ConcatTask, DelayInitialTask, DropWithinTask, FlattenMergeTask, FoldTask, GraphTask, GroupedWithinTask, InterleaveTask, MapAsyncTask, MergeTask, SingleSourceTask, SinkBridgeTask, SourceBridgeTask, StatefulMapConcatTask, TakeWithinTask, ThrottleTask, TickSourceTask, Zip2Task} +import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK} import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.StreamApp -import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper +import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle} +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.CountWindow import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} @@ -162,7 +164,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { case sinkBridge: SinkBridgeModule[_, _] => ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") case groupBy: GroupByModule[_, _] => - GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf) + GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindow.apply(1).accumulating), + parallelism, "groupBy", conf) case reduce: ReduceModule[_] => reduceOp(reduce.f, conf) case graphStage: GraphStageModule => @@ -192,7 +195,6 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { parallelism: Int, conf: UserConfig): Op = { module.stage match { case tickSource: TickSource[_] => - import TickSourceTask._ val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef] val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, tickSource.initialDelay). withValue[FiniteDuration](INTERVAL, tickSource.interval). @@ -322,9 +324,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { // TODO null case unzip: Unzip[_, _] => -// ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism, -// conf.withValue( -// Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip") + // ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism, + // conf.withValue( + // Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip") // TODO null case zip: Zip[_, _] => @@ -388,9 +390,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { private def translateSymbolic(stage: PushPullGraphStageWithMaterializedValue[_, _, _, _], conf: UserConfig): Op = { stage match { - case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _] => - symbolicGraphStage.symbolicStage match { - case buffer: Stages.Buffer[_] => + case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _] + if symbolicGraphStage.symbolicStage.attributes.equals( + Stages.DefaultAttributes.buffer) => { // ignore the buffering operation identity("buffer", conf) } @@ -478,7 +480,7 @@ object RemoteMaterializerImpl { def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String, conf: UserConfig): Op = { - ChainableOp(new FlatMapFunction[In, Out](fun, description), conf) + ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf) } def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala index 35d0e88..5b8c71b 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala index 2c430d5..ea76bb0 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala index 7555244..dfbbee9 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala index 4465886..b06dd0e 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala index 295556f..462d967 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala index 80619ef..8e43c16 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala index 5139117..43f07c4 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala index 582327b..5c2485b 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala index 9f1194f..292468d 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala index 241fa76..b77b9bd 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala index d6c347a..7c335dc 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala index 9da26b1..0c54829 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala index 512164d..14ff537 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala index e2f02d8..d982ebd 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala index 8e7a2df..3310ab9 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala index 29d9c91..eaf2b3f 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala index 837de6b..741ec43 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala index 387116d..daa1afc 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala index 2b1cd33..ad18f72 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala @@ -7,12 +7,12 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * SeG the License for the specific language governing permissions and + * See the License for the specific language governing permissions and * limitations under the License. */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala index 1ff9ccd..458bb4e 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala index 05011e9..1b9c4e3 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala index b0eda19..054b483 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala index bf2c14f..a0674bc 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala index ef43fbe..9559d8f 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala index 4e09bf2..3c7ad87 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala index b3850ca..d99d2db 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala index 7dd91fc..005d018 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala index a35b133..7e0c082 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala index c9fe67d..6ad90df 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java index 5b2a890..f52afc7 100644 --- a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java +++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java @@ -17,11 +17,12 @@ */ package org.apache.gearpump.cluster.utils; -import org.apache.commons.io.IOUtils; +import com.google.common.io.CharStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStreamReader; public class SystemOperation { @@ -44,8 +45,8 @@ public class SystemOperation { Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start(); try { process.waitFor(); - String output = IOUtils.toString(process.getInputStream()); - String errorOutput = IOUtils.toString(process.getErrorStream()); + String output = CharStreams.toString(new InputStreamReader(process.getInputStream())); + String errorOutput = CharStreams.toString(new InputStreamReader(process.getErrorStream())); LOG.debug("Shell Output: " + output); if (errorOutput.length() != 0) { LOG.error("Shell Error Output: " + errorOutput); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala index 74a4047..bf291cf 100644 --- a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala +++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala @@ -43,7 +43,7 @@ class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher } override def createProcess( - appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String], + appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String], classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { val cgroupCommand = if (executorId != APP_MASTER) { cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala index 84dec70..ea738d6 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.redis import java.nio.charset.Charset object RedisMessage { - private def toBytes(strings: List[String]): List[Array[Byte]] = strings.map(string => string.getBytes(Charset.forName("UTF8"))) @@ -48,11 +47,10 @@ object RedisMessage { * @param latitude * @param member */ - case class GEOADD(key: Array[Byte], longitude: Double, - latitude: Double, member: Array[Byte]) { - def this(key: String, longitude: Double, - latitude: Double, member: String) = + case class GEOADD(key: Array[Byte], longitude: Double, latitude: Double, member: Array[Byte]) { + def this(key: String, longitude: Double, latitude: Double, member: String) = { this(toBytes(key), longitude, latitude, toBytes(member)) + } } } @@ -66,7 +64,9 @@ object RedisMessage { * @param field */ case class HDEL(key: Array[Byte], field: Array[Byte]) { - def this(key: String, field: String) = this(toBytes(key), toBytes(field)) + def this(key: String, field: String) = { + this(toBytes(key), toBytes(field)) + } } /** @@ -77,8 +77,9 @@ object RedisMessage { * @param increment */ case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) { - def this(key: String, field: String, increment: Long) = + def this(key: String, field: String, increment: Long) = { this(toBytes(key), toBytes(field), increment) + } } /** @@ -89,8 +90,9 @@ object RedisMessage { * @param increment */ case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) { - def this(key: String, field: String, increment: Float) = + def this(key: String, field: String, increment: Float) = { this(toBytes(key), toBytes(field), increment) + } } @@ -102,8 +104,9 @@ object RedisMessage { * @param value */ case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { - def this(key: String, field: String, value: String) = + def this(key: String, field: String, value: String) = { this(toBytes(key), toBytes(field), toBytes(value)) + } } /** @@ -114,8 +117,9 @@ object RedisMessage { * @param value */ case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { - def this(key: String, field: String, value: String) = + def this(key: String, field: String, value: String) = { this(toBytes(key), toBytes(field), toBytes(value)) + } } } @@ -142,8 +146,9 @@ object RedisMessage { * @param value */ case class LPUSH(key: Array[Byte], value: Array[Byte]) { - - def this(key: String, value: String) = this(key, toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -153,7 +158,9 @@ object RedisMessage { * @param value */ case class LPUSHX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -164,7 +171,9 @@ object RedisMessage { * @param value */ case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) { - def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value)) + def this(key: String, index: Long, value: String) = { + this(toBytes(key), index, toBytes(value)) + } } /** @@ -174,8 +183,9 @@ object RedisMessage { * @param value */ case class RPUSH(key: Array[Byte], value: Array[Byte]) { - - def this(key: String, value: String) = this(key, toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -185,7 +195,9 @@ object RedisMessage { * @param value */ case class RPUSHX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } } @@ -198,8 +210,9 @@ object RedisMessage { * @param message */ case class DEL(message: Array[Byte]) { - - def this(message: String) = this(toBytes(message)) + def this(message: String) = { + this(toBytes(message)) + } } /** @@ -208,7 +221,9 @@ object RedisMessage { * @param key */ case class EXPIRE(key: Array[Byte], seconds: Int) { - def this(key: String, seconds: Int) = this(toBytes(key), seconds) + def this(key: String, seconds: Int) = { + this(toBytes(key), seconds) + } } /** @@ -218,7 +233,9 @@ object RedisMessage { * @param timestamp */ case class EXPIREAT(key: Array[Byte], timestamp: Long) { - def this(key: String, timestamp: Long) = this(toBytes(key), timestamp) + def this(key: String, timestamp: Long) = { + this(toBytes(key), timestamp) + } } /** @@ -230,9 +247,11 @@ object RedisMessage { * @param database * @param timeout */ - case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) { - def this(host: String, port: Int, key: String, database: Int, timeout: Int) = + case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], + database: Int, timeout: Int) { + def this(host: String, port: Int, key: String, database: Int, timeout: Int) = { this(toBytes(host), port, toBytes(key), database, timeout) + } } /** @@ -242,7 +261,9 @@ object RedisMessage { * @param db */ case class MOVE(key: Array[Byte], db: Int) { - def this(key: String, db: Int) = this(toBytes(key), db) + def this(key: String, db: Int) = { + this(toBytes(key), db) + } } /** @@ -251,7 +272,9 @@ object RedisMessage { * @param key */ case class PERSIST(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -261,7 +284,9 @@ object RedisMessage { * @param milliseconds */ case class PEXPIRE(key: Array[Byte], milliseconds: Long) { - def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + def this(key: String, milliseconds: Long) = { + this(toBytes(key), milliseconds) + } } /** @@ -271,7 +296,9 @@ object RedisMessage { * @param timestamp */ case class PEXPIREAT(key: Array[Byte], timestamp: Long) { - def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + def this(key: String, milliseconds: Long) = { + this(toBytes(key), milliseconds) + } } /** @@ -281,7 +308,9 @@ object RedisMessage { * @param newKey */ case class RENAME(key: Array[Byte], newKey: Array[Byte]) { - def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + def this(key: String, newKey: String) = { + this(toBytes(key), toBytes(newKey)) + } } /** @@ -291,7 +320,9 @@ object RedisMessage { * @param newKey */ case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) { - def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + def this(key: String, newKey: String) = { + this(toBytes(key), toBytes(newKey)) + } } } @@ -306,8 +337,9 @@ object RedisMessage { * @param members */ case class SADD(key: Array[Byte], members: Array[Byte]) { - - def this(key: String, members: String) = this(key, toBytes(members)) + def this(key: String, members: String) = { + this(toBytes(key), toBytes(members)) + } } @@ -319,8 +351,9 @@ object RedisMessage { * @param member */ case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) { - def this(source: String, destination: String, member: String) = + def this(source: String, destination: String, member: String) = { this(toBytes(source), toBytes(destination), toBytes(member)) + } } @@ -331,8 +364,9 @@ object RedisMessage { * @param member */ case class SREM(key: Array[Byte], member: Array[Byte]) { - - def this(key: String, member: String) = this(key, toBytes(member)) + def this(key: String, member: String) = { + this(toBytes(key), toBytes(member)) + } } } @@ -346,7 +380,9 @@ object RedisMessage { * @param value */ case class APPEND(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -355,7 +391,9 @@ object RedisMessage { * @param key */ case class DECR(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -365,7 +403,9 @@ object RedisMessage { * @param decrement */ case class DECRBY(key: Array[Byte], decrement: Int) { - def this(key: String, decrement: Int) = this(toBytes(key), decrement) + def this(key: String, decrement: Int) = { + this(toBytes(key), decrement) + } } /** @@ -374,7 +414,9 @@ object RedisMessage { * @param key */ case class INCR(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -384,7 +426,9 @@ object RedisMessage { * @param increment */ case class INCRBY(key: Array[Byte], increment: Int) { - def this(key: String, increment: Int) = this(toBytes(key), increment) + def this(key: String, increment: Int) = { + this(toBytes(key), increment) + } } /** @@ -394,7 +438,9 @@ object RedisMessage { * @param increment */ case class INCRBYFLOAT(key: Array[Byte], increment: Double) { - def this(key: String, increment: Number) = this(toBytes(key), increment) + def this(key: String, increment: Double) = { + this(toBytes(key), increment) + } } @@ -405,7 +451,9 @@ object RedisMessage { * @param value */ case class SET(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -416,7 +464,9 @@ object RedisMessage { * @param value */ case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) { - def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value)) + def this(key: String, offset: Long, value: String) = { + this(toBytes(key), offset, toBytes(value)) + } } /** @@ -427,7 +477,9 @@ object RedisMessage { * @param value */ case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) { - def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value)) + def this(key: String, seconds: Int, value: String) = { + this(toBytes(key), seconds, toBytes(value)) + } } /** @@ -437,7 +489,9 @@ object RedisMessage { * @param value */ case class SETNX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -448,9 +502,9 @@ object RedisMessage { * @param value */ case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) { - def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value)) + def this(key: String, offset: Int, value: String) = { + this(toBytes(key), offset, toBytes(value)) + } } - } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala index 3f75949..36a9fe3 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -32,20 +32,20 @@ import redis.clients.jedis.Jedis import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT} /** - * Save message in Redis Instance - * - * @param host - * @param port - * @param timeout - * @param database - * @param password - */ + * Save message in Redis Instance + * + * @param host + * @param port + * @param timeout + * @param database + * @param password + */ class RedisSink( - host: String = DEFAULT_HOST, - port: Int = DEFAULT_PORT, - timeout: Int = DEFAULT_TIMEOUT, - database: Int = DEFAULT_DATABASE, - password: String = "") extends DataSink { + host: String = DEFAULT_HOST, + port: Int = DEFAULT_PORT, + timeout: Int = DEFAULT_TIMEOUT, + database: Int = DEFAULT_DATABASE, + password: String = "") extends DataSink { private val LOG = LogUtil.getLogger(getClass) @transient private lazy val client = new Jedis(host, port, timeout) @@ -59,7 +59,6 @@ class RedisSink( } override def write(message: Message): Unit = { - message.msg match { // GEO case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index 544a4eb..df1de06 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -163,7 +163,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) val app = StreamApplication(name, processorGraph, config) LOG.info(s"jar file uploaded to $uploadedJarLocation") - val appId = clientContext.submit(app, uploadedJarLocation, workerNum) + val appId = clientContext.submit(app, uploadedJarLocation, workerNum).appId applications += name -> appId topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation) LOG.info(s"Storm Application $appId submitted") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala index aaa0a99..4969314 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.partitioner import org.apache.gearpump.Message import org.apache.gearpump.experiments.storm.topology.GearpumpTuple -import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner} /** * Partitioner bound to a target Storm component http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala index 777acab..e3f1339 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.util import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology -import org.apache.gearpump.partitioner.Partitioner +import org.apache.gearpump.streaming.partitioner.Partitioner import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala index 5513423..5fc631b 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala @@ -27,7 +27,7 @@ import org.scalatest.{Matchers, PropSpec} import org.apache.gearpump.Message import org.apache.gearpump.experiments.storm.topology.GearpumpTuple -import org.apache.gearpump.partitioner.Partitioner +import org.apache.gearpump.streaming.partitioner.Partitioner class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala index 33c3e97..95c95c7 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala @@ -19,6 +19,7 @@ package org.apache.gearpump.experiments.yarn object Constants { + val CONTAINER_USER = "gearpump.yarn.user" val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name" val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command" val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala index 711506a..fb482c4 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala @@ -48,7 +48,7 @@ abstract class AbstractCommand extends Command { : String = { val exe = config.getString(java) - s"$exe -cp ${classPath.mkString(":")}:" + + s"$exe -noverify -cp ${classPath.mkString(":")}:" + "$CLASSPATH " + properties.mkString(" ") + s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala index 2475728..267d588 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala @@ -26,6 +26,7 @@ import com.typesafe.config.{Config, ConfigValueFactory} import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import org.apache.gearpump.experiments.yarn.Constants +import org.apache.gearpump.experiments.yarn.Constants._ import org.apache.gearpump.experiments.yarn.appmaster.AppMasterCommand import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig} import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource} @@ -184,6 +185,10 @@ object LaunchCluster extends AkkaApp with ArgumentsParser { if (parsed.getBoolean(VERBOSE)) { LogUtil.verboseLogToConsole() } + if (inputAkkaConf.hasPath(CONTAINER_USER)) { + val userName = inputAkkaConf.getString(CONTAINER_USER) + System.setProperty("HADOOP_USER_NAME", userName) + } val yarnConfig = new YarnConfig() val fs = new FileSystem(yarnConfig) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala deleted file mode 100644 index c4c5a65..0000000 --- a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.experiments.yarn.appmaster - -import com.typesafe.config.ConfigFactory -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.transport.HostPort - -class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - val config = ConfigFactory.parseString( - - """ - | - |gearpump { - | yarn { - | client { - | package -path = "/user/gearpump/gearpump.zip" - | } - | - | applicationmaster { - | ## Memory of YarnAppMaster - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | queue = "default" - | } - | - | master { - | ## Memory of master daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | } - | - | worker { - | ## memory of worker daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | containers = "4" - | ## This also contains all memory for child executors. - | memory = "4096" - | vcores = "1" - | } - | services { - | enabled = true - | } - | } - |} - """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) - - "MasterCommand" should "create correct command line" in { - val version = "gearpump-0.1" - val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080)) - - // scalastyle:off line.size.limit - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> org.apache.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - assert(master.get == expected) - } - - "WorkerCommand" should "create correct command line" in { - val version = "gearpump-0.1" - val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine") - // scalastyle:off line.size.limit - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine org.apache.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - assert(worker.get == expected) - } - - "AppMasterCommand" should "create correct command line" in { - val version = "gearpump-0.1" - val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3")) - // scalastyle:off line.size.limit - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - assert(appmaster.get == expected) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala index c0a4ee2..3f2f9cb 100644 --- a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala +++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala @@ -186,7 +186,7 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) // scalastyle:off line.size.limit - val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -noverify -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" // scalastyle:on line.size.limit verify(yarnClient).submit("gearpump", appId, expectedCommand, Resource.newInstance(512, 1), "default", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index f85c43b..4b41ba1 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -31,22 +31,22 @@ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.security.UserGroupInformation -class HBaseSink(userconfig: UserConfig, tableName: String, +class HBaseSink(userConfig: UserConfig, tableName: String, val conn: (UserConfig, Configuration) => Connection, @transient var configuration: Configuration) extends DataSink { - lazy val connection = conn(userconfig, configuration) + lazy val connection = conn(userConfig, configuration) lazy val table = connection.getTable(TableName.valueOf(tableName)) override def open(context: TaskContext): Unit = {} - def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = { - this(userconfig, tableName, HBaseSink.getConnection, configuration) + def this(userConfig: UserConfig, tableName: String, configuration: Configuration) = { + this(userConfig, tableName, HBaseSink.getConnection, configuration) } - def this(userconfig: UserConfig, tableName: String) = { - this(userconfig, tableName, HBaseConfiguration.create()) + def this(userConfig: UserConfig, tableName: String) = { + this(userConfig, tableName, HBaseConfiguration.create()) } def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { @@ -120,14 +120,14 @@ object HBaseSink { val COLUMN_NAME = "hbase.table.column.name" val HBASE_USER = "hbase.user" - def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) + def apply[T](userConfig: UserConfig, tableName: String, configuration: Configuration) : HBaseSink = { - new HBaseSink(userconfig, tableName, configuration) + new HBaseSink(userConfig, tableName, configuration) } - def apply[T](userconfig: UserConfig, tableName: String) + def apply[T](userConfig: UserConfig, tableName: String) : HBaseSink = { - new HBaseSink(userconfig, tableName) + new HBaseSink(userConfig, tableName) } private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala index 2417763..22efa89 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala @@ -18,13 +18,10 @@ package org.apache.gearpump.external.hbase.dsl import scala.language.implicitConversions - import org.apache.hadoop.conf.Configuration - import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.external.hbase.HBaseSink -import org.apache.gearpump.streaming.dsl.Stream -import org.apache.gearpump.streaming.dsl.Stream.Sink +import org.apache.gearpump.streaming.dsl.scalaapi.Stream /** Create a HBase DSL Sink */ class HBaseDSLSink[T](stream: Stream[T]) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala index f1bb26a..996ae0b 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl -import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp} import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory @@ -44,7 +44,7 @@ object KafkaDSL { parallelism: Int = 1, config: UserConfig = UserConfig.empty, description: String = "KafkaSource" - ): dsl.Stream[T] = { + ): Stream[T] = { app.source[T](new KafkaSource(topics, properties), parallelism, config, description) } @@ -66,19 +66,19 @@ object KafkaDSL { properties: Properties, parallelism: Int = 1, config: UserConfig = UserConfig.empty, - description: String = "KafkaSource"): dsl.Stream[T] = { + description: String = "KafkaSource"): Stream[T] = { val source = new KafkaSource(topics, properties) source.setCheckpointStore(checkpointStoreFactory) app.source[T](source, parallelism, config, description) } import scala.language.implicitConversions - implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = { + implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = { new KafkaDSL[T](stream) } } -class KafkaDSL[T](stream: dsl.Stream[T]) { +class KafkaDSL[T](stream: Stream[T]) { /** * Sinks data to Kafka @@ -94,7 +94,7 @@ class KafkaDSL[T](stream: dsl.Stream[T]) { properties: Properties, parallelism: Int = 1, userConfig: UserConfig = UserConfig.empty, - description: String = "KafkaSink"): dsl.Stream[T] = { + description: String = "KafkaSink"): Stream[T] = { stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore ---------------------------------------------------------------------- diff --git a/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore new file mode 100644 index 0000000..2809247 --- /dev/null +++ b/gearpump-hadoop/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.gearpump.jarstore.dfs.DFSJarStore \ No newline at end of file
