http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala index d4b7871..55aa73f 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.metrics -import org.apache.gearpump.codahale.metrics.{Meter => CodaHaleMeter} +import com.codahale.metrics.{Meter => CodaHaleMeter} /** See org.apache.gearpump.codahale.metrics.Meter */ class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala index 1ee3798..3737361 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import akka.actor._ import org.slf4j.Logger -import org.apache.gearpump.codahale.metrics._ +import com.codahale.metrics._ import org.apache.gearpump.metrics import org.apache.gearpump.util.LogUtil http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala index 94aa114..620dc61 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala @@ -20,12 +20,11 @@ package org.apache.gearpump.metrics import java.net.InetSocketAddress import java.util.concurrent.TimeUnit -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.actor.{Actor, ActorRef} - -import org.apache.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter} -import org.apache.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter} +import com.codahale.metrics.{MetricFilter, Slf4jReporter} +import com.codahale.metrics.graphite.{Graphite, GraphiteReporter} import org.apache.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics} import org.apache.gearpump.metrics.MetricsReporterService.ReportTo import org.apache.gearpump.util.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala deleted file mode 100644 index 99cbcb6..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.apache.gearpump.Message - -/** Used by storm module to broadcast message to all downstream tasks */ -class BroadcastPartitioner extends MulticastPartitioner { - private var lastPartitionNum = -1 - private var partitions = Array.empty[Int] - - override def getPartitions( - msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { - if (partitionNum != lastPartitionNum) { - partitions = (0 until partitionNum).toArray - lastPartitionNum = partitionNum - } - partitions - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala deleted file mode 100644 index 5a3eec4..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.apache.gearpump.Message - -/** - * Will have the same parallelism with last processor - * And each task in current processor will co-locate with task of last processor - */ -class CoLocationPartitioner extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - currentPartitionId - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala deleted file mode 100644 index ee684a9..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.apache.gearpump.Message - -/** - * Only make sense when the message has implemented the hashCode() - * Otherwise, it will use Object.hashCode(), which will not return - * same hash code after serialization and deserialization. - */ -class HashPartitioner extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala deleted file mode 100644 index d68fa65..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import scala.reflect.ClassTag - -import org.apache.commons.lang.SerializationUtils - -import org.apache.gearpump.Message - -/** - * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task - * of upstream processor A send to several tasks of downstream processor B. - */ -sealed trait Partitioner extends Serializable - -/** - * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does - * ONE-task {@literal ->} ONE-task mapping. - */ -trait UnicastPartitioner extends Partitioner { - - /** - * Gets the SINGLE downstream processor task index to send message to. - * - * @param msg Message you want to send - * @param partitionNum How many tasks does the downstream processor have. - * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. - * - * @return ONE task index of downstream processor. - */ - def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int - - def getPartition(msg: Message, partitionNum: Int): Int = { - getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) - } -} - -trait MulticastPartitioner extends Partitioner { - - /** - * Gets a list of downstream processor task indexes to send message to. - * - * @param upstreamTaskIndex Current sender task's task index. - * - */ - def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] - - def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { - getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) - } -} - -sealed trait PartitionerFactory { - - def name: String - - def partitioner: Partitioner -} - -/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ -class PartitionerObject(private[this] val _partitioner: Partitioner) - extends PartitionerFactory with Serializable { - - override def name: String = partitioner.getClass.getName - - override def partitioner: Partitioner = { - SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] - } -} - -/** Store the partitioner in class Name, the user need to instantiate a new class */ -class PartitionerByClassName(partitionerClass: String) - extends PartitionerFactory with Serializable { - - override def name: String = partitionerClass - override def partitioner: Partitioner = { - Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] - } -} - -/** - * @param partitionerFactory How we construct a Partitioner. - */ -case class PartitionerDescription(partitionerFactory: PartitionerFactory) - -object Partitioner { - val UNKNOWN_PARTITION_ID = -1 - - def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = { - PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala deleted file mode 100644 index 55ef614..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import scala.util.Random - -import org.apache.gearpump.Message - -/** - * The idea of ShuffleGroupingPartitioner is derived from Storm. - * Messages are randomly distributed across the downstream's tasks in a way such that - * each task is guaranteed to get an equal number of messages. - */ -class ShuffleGroupingPartitioner extends UnicastPartitioner { - private val random = new Random - private var index = -1 - private var partitions = List.empty[Int] - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - index += 1 - if (partitions.isEmpty) { - partitions = 0.until(partitionNum).toList - partitions = random.shuffle(partitions) - } else if (index >= partitionNum) { - index = 0 - partitions = random.shuffle(partitions) - } - partitions(index) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala deleted file mode 100644 index 5c66d66..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import java.util.Random - -import org.apache.gearpump.Message - -/** - * Round Robin partition the data to downstream processor tasks. - */ -class ShufflePartitioner extends UnicastPartitioner { - private var seed = 0 - private var count = 0 - - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - - if (seed == 0) { - seed = newSeed() - } - - val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum - count = count + 1 - result - } - - private def newSeed(): Int = new Random().nextInt() -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala index f258c0f..cb3d2fd 100644 --- a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala +++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala @@ -19,19 +19,18 @@ package org.apache.gearpump.serializer import akka.actor.ExtendedActorSystem - -import org.apache.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy -import org.apache.gearpump.objenesis.strategy.StdInstantiatorStrategy -import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper +import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy +import com.romix.akka.serialization.kryo.{KryoBasedSerializer, KryoSerializer} import org.apache.gearpump.serializer.FastKryoSerializer.KryoSerializationException import org.apache.gearpump.util.LogUtil +import org.objenesis.strategy.StdInstantiatorStrategy class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer { private val LOG = LogUtil.getLogger(getClass) private val config = system.settings.config - private val kryoSerializer = new KryoSerializerWrapper(system) + private val kryoSerializer: KryoBasedSerializer = new KryoSerializer(system).serializer private val kryo = kryoSerializer.kryo val strategy = new DefaultInstantiatorStrategy strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy) @@ -40,7 +39,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer { override def serialize(message: Any): Array[Byte] = { try { - kryoSerializer.toBinary(message) + kryoSerializer.toBinary(message.asInstanceOf[AnyRef]) } catch { case ex: java.lang.IllegalArgumentException => val clazz = message.getClass http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala index 524089d..45a5481 100644 --- a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala +++ b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala @@ -18,10 +18,9 @@ package org.apache.gearpump.serializer +import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} import com.typesafe.config.Config import org.slf4j.Logger - -import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} import org.apache.gearpump.util.{Constants, LogUtil} class GearpumpSerialization(config: Config) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala index 09f2969..82c7fe2 100644 --- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala @@ -21,13 +21,12 @@ package org.apache.gearpump.util import org.apache.gearpump.cluster.AppMasterContext import org.apache.gearpump.cluster.worker.WorkerId -import scala.concurrent.{ExecutionContext, Future} - +import scala.concurrent.{Await, ExecutionContext, Future} import akka.actor.Actor.Receive import akka.actor._ import akka.pattern.ask import org.slf4j.Logger - +import akka.util.Timeout import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList @@ -36,6 +35,8 @@ import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSy import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import org.apache.gearpump.transport.HostPort +import scala.concurrent.duration.Duration + object ActorUtil { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -136,4 +137,13 @@ object ActorUtil { implicit val timeout = Constants.FUTURE_TIMEOUT (actor ? msg).asInstanceOf[Future[T]] } + + def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = { + askActor(actor, msg, timeout, ActorRef.noSender) + } + + def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef) + (implicit ex: ExecutionContext): T = { + Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala b/core/src/main/scala/org/apache/gearpump/util/Constants.scala index dba5a1f..c98726e 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala @@ -20,8 +20,6 @@ package org.apache.gearpump.util import java.util.concurrent.TimeUnit -import org.apache.gearpump.partitioner._ - object Constants { val MASTER_WATCHER = "masterwatcher" val SINGLETON_MANAGER = "singleton" @@ -140,14 +138,6 @@ object Constants { val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path" val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise" - // The partitioners provided by Gearpump - val BUILTIN_PARTITIONERS = Array( - classOf[BroadcastPartitioner], - classOf[CoLocationPartitioner], - classOf[HashPartitioner], - classOf[ShuffleGroupingPartitioner], - classOf[ShufflePartitioner]) - // Security related val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file" val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala index e3df37b..283a64a 100644 --- a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala +++ b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.util import java.io.{File, IOException} import java.nio.charset.Charset -import org.apache.gearpump.google.common.io.Files +import com.google.common.io.Files object FileUtils { private val UTF8 = Charset.forName("UTF-8") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala index 0faa46a..8ee0e26 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Util.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala @@ -66,8 +66,14 @@ object Util { def startProcess(options: Array[String], classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { val java = System.getProperty("java.home") + "/bin/java" - val command = List(java) ++ options ++ - List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments + + val command = List(java) ++ + // java.lang.VerifyError will be caused without "-noverify" + // TODO: investigate the cause and remove this + Array("-noverify") ++ + options ++ + List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ + arguments LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " + s"\n ${options.mkString(" ")}") val logger = new ProcessLogRedirector() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore ---------------------------------------------------------------------- diff --git a/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore new file mode 100644 index 0000000..c64d444 --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.gearpump.jarstore.local.LocalJarStore \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala new file mode 100644 index 0000000..0a22245 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster + +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.pattern.ask +import akka.testkit.TestActorRef +import com.typesafe.config.ConfigValueFactory +import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers +import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList +import org.apache.gearpump.cluster.master.Master +import org.apache.gearpump.cluster.worker.Worker +import org.apache.gearpump.util.Constants + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +class MiniCluster { + private val mockMasterIP = "127.0.0.1" + + implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG. + withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP))) + + val (mockMaster, worker) = { + val master = system.actorOf(Props(classOf[Master]), "master") + val worker = system.actorOf(Props(classOf[Worker], master), "worker") + + // Wait until worker register itself to master + waitUtilWorkerIsRegistered(master) + (master, worker) + } + + def launchActor(props: Props): TestActorRef[Actor] = { + TestActorRef(props) + } + + private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = { + while (!isWorkerRegistered(master)) {} + } + + private def isWorkerRegistered(master: ActorRef): Boolean = { + import scala.concurrent.duration._ + implicit val dispatcher = system.dispatcher + + implicit val futureTimeout = Constants.FUTURE_TIMEOUT + + val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]] + + // Waits until the worker is registered. + val workers = Await.result[WorkerList](workerListFuture, 15.seconds) + workers.workers.size > 0 + } + + def shutDown(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala new file mode 100644 index 0000000..f9b0762 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.{Actor, ActorRef, Props} +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} +import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager} +import org.apache.gearpump.cluster.master.AppManager._ +import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} +import org.apache.gearpump.cluster.{TestUtil, _} +import org.apache.gearpump.util.LogUtil +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.util.Success + +class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + var kvService: TestProbe = null + var haService: TestProbe = null + var appLauncher: TestProbe = null + var appManager: ActorRef = null + private val LOG = LogUtil.getLogger(getClass) + + override def config: Config = TestUtil.DEFAULT_CONFIG + + override def beforeEach(): Unit = { + startActorSystem() + kvService = TestProbe()(getActorSystem) + appLauncher = TestProbe()(getActorSystem) + + appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, + new DummyAppMasterLauncherFactory(appLauncher)))) + kvService.expectMsgType[GetKV] + kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty))) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "AppManager" should "handle AppMaster message correctly" in { + val appMaster = TestProbe()(getActorSystem) + val appId = 1 + + val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName")) + appMaster.send(appManager, register) + appMaster.expectMsgType[AppMasterRegistered] + + appMaster.send(appManager, ActivateAppMaster(appId)) + appMaster.expectMsgType[AppMasterActivated] + } + + "DataStoreService" should "support Put and Get" in { + val appMaster = TestProbe()(getActorSystem) + appMaster.send(appManager, SaveAppData(0, "key", 1)) + kvService.expectMsgType[PutKV] + kvService.reply(PutKVSuccess) + appMaster.expectMsg(AppDataSaved) + + appMaster.send(appManager, GetAppData(0, "key")) + kvService.expectMsgType[GetKV] + kvService.reply(GetKVSuccess("key", 1)) + appMaster.expectMsg(GetAppDataResult("key", 1)) + } + + "AppManager" should "support application submission and shutdown" in { + testClientSubmission(withRecover = false) + } + + "AppManager" should "support application submission and recover if appmaster dies" in { + LOG.info("=================testing recover==============") + testClientSubmission(withRecover = true) + } + + "AppManager" should "handle client message correctly" in { + val mockClient = TestProbe()(getActorSystem) + mockClient.send(appManager, ShutdownApplication(1)) + assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure) + + mockClient.send(appManager, ResolveAppId(1)) + assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) + + mockClient.send(appManager, AppMasterDataRequest(1)) + mockClient.expectMsg(AppMasterData(AppMasterNonExist)) + } + + "AppManager" should "reject the application submission if the app name already existed" in { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + val appId = 1 + + client.send(appManager, submit) + + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + appMaster.send(appManager, RegisterAppMaster(appMaster.ref, + AppMasterRuntimeInfo(appId, app.name))) + appMaster.expectMsgType[AppMasterRegistered] + + client.send(appManager, submit) + assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) + } + + def testClientSubmission(withRecover: Boolean): Unit = { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + val appId = 1 + + client.send(appManager, submit) + + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + appMaster.send(appManager, RegisterAppMaster(appMaster.ref, + AppMasterRuntimeInfo(appId, app.name))) + kvService.expectMsgType[PutKV] + appMaster.expectMsgType[AppMasterRegistered] + + client.send(appManager, ResolveAppId(appId)) + client.expectMsg(ResolveAppIdResult(Success(appMaster.ref))) + + client.send(appManager, AppMastersDataRequest) + client.expectMsgType[AppMastersData] + + client.send(appManager, AppMasterDataRequest(appId, false)) + client.expectMsgType[AppMasterData] + + if (!withRecover) { + client.send(appManager, ShutdownApplication(appId)) + client.expectMsg(ShutdownApplicationResult(Success(appId))) + } else { + // Do recovery + getActorSystem.stop(appMaster.ref) + kvService.expectMsgType[GetKV] + val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) + kvService.reply(GetKVSuccess(APP_STATE, appState)) + appLauncher.expectMsg(LauncherStarted(appId)) + } + } +} + +class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { + override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props = { + Props(new DummyAppMasterLauncher(test, appId)) + } +} + +class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { + test.ref ! LauncherStarted(appId) + + override def receive: Receive = { + case any: Any => test.ref forward any + } +} + +case class LauncherStarted(appId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala new file mode 100644 index 0000000..d3e739f --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.Props +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.gearpump.cluster.master.InMemoryKVService +import org.apache.gearpump.cluster.master.InMemoryKVService._ +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.concurrent.duration._ + +class InMemoryKVServiceSpec + extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + + override def beforeEach(): Unit = { + startActorSystem() + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + override def config: Config = TestUtil.MASTER_CONFIG + + "KVService" should "get, put, delete correctly" in { + val system = getActorSystem + val kvService = system.actorOf(Props(new InMemoryKVService())) + val group = "group" + + val client = TestProbe()(system) + + client.send(kvService, PutKV(group, "key", 1)) + client.expectMsg(PutKVSuccess) + + client.send(kvService, PutKV(group, "key", 2)) + client.expectMsg(PutKVSuccess) + + client.send(kvService, GetKV(group, "key")) + client.expectMsg(GetKVSuccess("key", 2)) + + client.send(kvService, DeleteKVGroup(group)) + + // After DeleteGroup, it no longer accept Get and Put message for this group. + client.send(kvService, GetKV(group, "key")) + client.expectNoMsg(3.seconds) + + client.send(kvService, PutKV(group, "key", 3)) + client.expectNoMsg(3.seconds) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala new file mode 100644 index 0000000..5f0d5e4 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.client + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import akka.util.Timeout +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.client.RunningApplicationSpec.{MockAskAppMasterRequest, MockAskAppMasterResponse} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} +import scala.concurrent.ExecutionContext.Implicits.global + +class RunningApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "RunningApplication" should "be able to shutdown application" in { + val errorMsg = "mock exception" + val master = TestProbe() + val timeout = Timeout(90, TimeUnit.SECONDS) + val application = new RunningApplication(1, master.ref, timeout) + Future { + application.shutDown() + } + master.expectMsg(ShutdownApplication(1)) + master.reply(ShutdownApplicationResult(Success(1))) + + val result = Future { + intercept[Exception] { + application.shutDown() + } + } + master.expectMsg(ShutdownApplication(1)) + master.reply(ShutdownApplicationResult(Failure(new Exception(errorMsg)))) + val exception = Await.result(result, Duration.Inf) + assert(exception.getMessage.equals(errorMsg)) + } + + "RunningApplication" should "be able to ask appmaster" in { + val master = TestProbe() + val appMaster = TestProbe() + val appId = 1 + val timeout = Timeout(90, TimeUnit.SECONDS) + val request = MockAskAppMasterRequest("request") + val application = new RunningApplication(appId, master.ref, timeout) + val future = application.askAppMaster[MockAskAppMasterResponse](request) + master.expectMsg(ResolveAppId(appId)) + master.reply(ResolveAppIdResult(Success(appMaster.ref))) + appMaster.expectMsg(MockAskAppMasterRequest("request")) + appMaster.reply(MockAskAppMasterResponse("response")) + val result = Await.result(future, Duration.Inf) + assert(result.res.equals("response")) + + // ResolveAppId should not be called multiple times + val future2 = application.askAppMaster[MockAskAppMasterResponse](request) + appMaster.expectMsg(MockAskAppMasterRequest("request")) + appMaster.reply(MockAskAppMasterResponse("response")) + val result2 = Await.result(future2, Duration.Inf) + assert(result2.res.equals("response")) + } +} + +object RunningApplicationSpec { + case class MockAskAppMasterRequest(req: String) + + case class MockAskAppMasterResponse(res: String) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala new file mode 100644 index 0000000..2166976 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import java.util.Properties + +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} +import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered +import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{Constants, LogUtil, Util} +import org.scalatest._ + +import scala.concurrent.Future +import scala.util.{Success, Try} + +class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + + private val LOG = LogUtil.getLogger(getClass) + + override def config: Config = TestUtil.DEFAULT_CONFIG + + override def beforeEach(): Unit = { + startActorSystem() + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "Worker" should "register worker address to master when started." in { + + val masterReceiver = createMockMaster() + + val tempTestConf = convertTestConf(getHost, getPort) + + val options = Array( + s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}", + s"-D${PREFER_IPV4}=true" + ) ++ getMasterListOption() + + val worker = Util.startProcess(options, + getContextClassPath, + getMainClassName(Worker), + Array.empty) + + try { + masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker) + + tempTestConf.delete() + } finally { + worker.destroy() + } + } + + "Master" should "accept worker RegisterNewWorker when started" in { + val worker = TestProbe()(getActorSystem) + + val host = "127.0.0.1" + val port = Util.findFreePort().get + + val properties = new Properties() + properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port") + properties.put(s"${GEARPUMP_HOSTNAME}", s"$host") + val masterConfig = ConfigFactory.parseProperties(properties) + .withFallback(TestUtil.MASTER_CONFIG) + Future { + Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString)) + } + + val masterProxy = getActorSystem.actorOf( + MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec") + + worker.send(masterProxy, RegisterNewWorker) + worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME) + } + + "Info" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) + masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) + } + + "Kill" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + Kill.main(masterConfig, Array("-appid", "0")) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0)) + masterReceiver.reply(ShutdownApplicationResult(Success(0))) + } + + "Replay" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + Replay.main(masterConfig, Array("-appid", "0")) + } + + masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME) + masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref))) + masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME) + masterReceiver.reply(ReplayApplicationResult(Success(0))) + } + + "Local" should "be started without exception" in { + val port = Util.findFreePort().get + val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port", + s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost", + s"-D${PREFER_IPV4}=true") + + val local = Util.startProcess(options, + getContextClassPath, + getMainClassName(Local), + Array.empty) + + def retry(times: Int)(fn: => Boolean): Boolean = { + + LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..") + + val result = fn + if (result || times <= 0) { + result + } else { + Thread.sleep(1000) + retry(times - 1)(fn) + } + } + + try { + assert(retry(10)(isPortUsed("127.0.0.1", port)), + "local is not started successfully, as port is not used " + port) + } finally { + local.destroy() + } + } + + "Gear" should "support app|info|kill|shell|replay" in { + + val commands = Array("app", "info", "kill", "shell", "replay") + + assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw") + + for (command <- commands) { + assert(Try(Gear.main(Array("-noexist"))).isFailure, + "pass unknown option, throw, command: " + command) + } + + assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ") + + val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist"))) + assert(tryThis.isFailure, "unknown command, throw") + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala new file mode 100644 index 0000000..b48fc2a --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.gearpump.cluster.TestUtil +import org.scalatest.{FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class MasterWatcherSpec extends FlatSpec with Matchers { + def config: Config = TestUtil.MASTER_CONFIG + + "MasterWatcher" should "kill itself when can not get a quorum" in { + val system = ActorSystem("ForMasterWatcher", config) + + val actorWatcher = TestProbe()(system) + + val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) + actorWatcher watch masterWatcher + actorWatcher.expectTerminated(masterWatcher, 5.seconds) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala new file mode 100644 index 0000000..8a3d7d1 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.scheduler + +import akka.actor.{ActorSystem, Props} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} +import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import org.apache.gearpump.cluster.worker.WorkerId +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.concurrent.duration._ + +class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpecLike with Matchers with BeforeAndAfterAll{ + + def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) + val appId = 0 + val workerId1: WorkerId = WorkerId(1, 0L) + val workerId2: WorkerId = WorkerId(2, 0L) + val mockAppMaster = TestProbe() + val mockWorker1 = TestProbe() + val mockWorker2 = TestProbe() + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "The scheduler" should { + "update resource only when the worker is registered" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)) + expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " + + s"registered into master")) + } + + "drop application's resource requests when the application is removed" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + mockAppMaster.expectNoMsg(5.seconds) + } + } + + def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = { + left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId)) + } + + "The resource request with higher priority" should { + "be handled first" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY) + val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY) + + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The resource request which delivered earlier" should { + "be handled first if the priorities are the same" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The PriorityScheduler" should { + "handle the resource request with different relaxation" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER) + val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER) + + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + val request3 = ResourceRequest( + Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + + expect = ResourceAllocated(Array( + ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), + ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + // We have to manually update the resource on each worker + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The PriorityScheduler" should { + "handle the resource request with different executor number" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + // By default, the request requires only one executor + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations2.allocations.length == 1) + assert(allocations2.allocations.head.resource == Resource(20)) + + val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations3.allocations.length == 3) + assert(allocations3.allocations.forall(_.resource == Resource(8))) + + // The total available resource can not satisfy the requirements with executor number + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations4.allocations.length == 2) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + + // When new resources are available, the remaining request will be satisfied + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) + val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations5.allocations.length == 1) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala new file mode 100644 index 0000000..e0233f8 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.worker + +import akka.actor.{ActorSystem, PoisonPill, Props} +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} +import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} +import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} +import org.scalatest._ + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { + override def config: Config = TestUtil.DEFAULT_CONFIG + + val appId = 1 + val workerId: WorkerId = WorkerId(1, 0L) + val executorId = 1 + var masterProxy: TestProbe = null + var mockMaster: TestProbe = null + var client: TestProbe = null + val workerSlots = 50 + + override def beforeEach(): Unit = { + startActorSystem() + mockMaster = TestProbe()(getActorSystem) + masterProxy = TestProbe()(getActorSystem) + client = TestProbe()(getActorSystem) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "The new started worker" should { + "kill itself if no response from Master after registering" in { + val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) + mockMaster watch worker + mockMaster.expectMsg(RegisterNewWorker) + mockMaster.expectTerminated(worker, 60.seconds) + } + } + + "Worker" should { + "init its resource from the gearpump config" in { + val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots"). + withFallback(TestUtil.DEFAULT_CONFIG) + val workerSystem = ActorSystem("WorkerSystem", config) + val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) + mockMaster watch worker + mockMaster.expectMsg(RegisterNewWorker) + + worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots))) + + worker.tell( + UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref) + mockMaster.expectTerminated(worker, 5.seconds) + workerSystem.terminate() + Await.result(workerSystem.whenTerminated, Duration.Inf) + } + } + + "Worker" should { + "update its remaining resource when launching and shutting down executors" in { + val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref)) + masterProxy.expectMsg(RegisterNewWorker) + + worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) + + val executorName = ActorUtil.actorNameForExecutor(appId, executorId) + // This is an actor path which the ActorSystemBooter will report back to, + // not needed in this test + val reportBack = "dummy" + val executionContext = ExecutorJVMConfig(Array.empty[String], + getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), + classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, + username = "user") + + // Test LaunchExecutor + worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), + mockMaster.ref) + mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine")) + + worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95))) + + worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98))) + + // Test terminationWatch + worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) + client.expectMsg(ShutdownExecutorSucceed(1, 1)) + + worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref) + client.expectMsg(ShutdownExecutorFailed( + s"Can not find executor ${executorId + 1} for app $appId")) + + mockMaster.ref ! PoisonPill + masterProxy.expectMsg(RegisterWorker(workerId)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala index c99a031..39b6261 100644 --- a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala @@ -22,9 +22,9 @@ import java.io.File import java.util.concurrent.TimeUnit import akka.actor.ActorSystem -import com.typesafe.config.{ConfigValueFactory, ConfigValue} +import com.google.common.io.Files +import com.typesafe.config.ConfigValueFactory import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.google.common.io.Files import org.apache.gearpump.jarstore.local.LocalJarStore import org.apache.gearpump.util.{FileUtils, LogUtil} import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -39,7 +39,7 @@ class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { val host = "localhost" private val LOG = LogUtil.getLogger(getClass) - var system: ActorSystem = null + var system: ActorSystem = _ override def afterAll { if (null != system) { @@ -75,7 +75,6 @@ class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { "The file server" should { "serve the data previously stored" in { - val rootDir = Files.createTempDir() val localJarStore: JarStore = new LocalJarStore val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala index 5881640..0855553 100644 --- a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala @@ -18,12 +18,13 @@ package org.apache.gearpump.metrics +import com.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter} + import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter} class MetricsSpec extends FlatSpec with Matchers with MockitoSugar { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala deleted file mode 100644 index fcf819b..0000000 --- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.scalatest.{FlatSpec, Matchers} - -import org.apache.gearpump.Message - -class PartitionerSpec extends FlatSpec with Matchers { - val NUM = 10 - - "HashPartitioner" should "hash same key to same slots" in { - val partitioner = new HashPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } - - "ShufflePartitioner" should "hash same key randomly" in { - val partitioner = new ShufflePartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } - - "BroadcastPartitioner" should "return all partitions" in { - val partitioner = new BroadcastPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - val partitions = partitioner.getPartitions(msg, NUM) - - partitions should contain theSameElementsAs 0.until(NUM) - } - - - "ShuffleGroupingPartitioner" should "hash same key randomly" in { - val partitioner = new ShuffleGroupingPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala index f4bd114..0772a5e 100644 --- a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala @@ -18,20 +18,23 @@ package org.apache.gearpump.serializer -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - import akka.actor.{ActorSystem, ExtendedActorSystem} + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} import com.typesafe.config.{ConfigFactory, ConfigValueFactory} -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output} -import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} import org.apache.gearpump.serializer.SerializerSpec._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + + class SerializerSpec extends FlatSpec with Matchers with MockitoSugar { val config = ConfigFactory.empty.withValue("gearpump.serializers", ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName, @@ -70,7 +73,7 @@ object SerializerSpec { class ClassASerializer extends KryoSerializer[ClassA] { override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = { - output.writeString(classOf[ClassA].getName.toString) + output.writeString(classOf[ClassA].getName) } override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala index 66abc36..97b35ad 100644 --- a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala @@ -18,13 +18,13 @@ package org.apache.gearpump.util +import com.google.common.io.Files + import java.io.File import java.util import org.scalatest.FlatSpec -import org.apache.gearpump.google.common.io.Files - class FileUtilsSpec extends FlatSpec { val TXT = """ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore ---------------------------------------------------------------------- diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore deleted file mode 100644 index e173a8a..0000000 --- a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.gearpump.jarstore.local.LocalJarStore -org.apache.gearpump.jarstore.dfs.DFSJarStore \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala deleted file mode 100644 index 9e55be6..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster - -import akka.actor.ActorRef - -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Resource -import org.apache.gearpump.cluster.worker.WorkerId - -/** - * Cluster Bootup Flow - */ -object WorkerToMaster { - - /** When an worker is started, it sends RegisterNewWorker */ - case object RegisterNewWorker - - /** When worker lose connection with master, it tries to register itself again with old Id. */ - case class RegisterWorker(workerId: WorkerId) - - /** Worker is responsible to broadcast its current status to master */ - case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource) -} - -object MasterToWorker { - - /** Master confirm the reception of RegisterNewWorker or RegisterWorker */ - case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo) - - /** Worker have not received reply from master */ - case class UpdateResourceFailed(reason: String = null, ex: Throwable = null) - - /** Master is synced with worker on resource slots managed by current worker */ - case object UpdateResourceSucceed -} \ No newline at end of file
