http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala deleted file mode 100644 index 342cd87..0000000 --- a/core/src/main/scala/io/gearpump/util/Constants.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.util.concurrent.TimeUnit - -import io.gearpump.partitioner._ - -object Constants { - val MASTER_WATCHER = "masterwatcher" - val SINGLETON_MANAGER = "singleton" - - val MASTER_CONFIG = "gearpump-master" - val WORKER_CONFIG = "gearpump-worker" - val UI_CONFIG = "gearpump-ui" - val LINUX_CONFIG = "gearpump-linux" // linux or Mac - - val MASTER = "master" - val WORKER = "worker" - - val GEARPUMP_WORKER_SLOTS = "gearpump.worker.slots" - val GEARPUMP_EXECUTOR_PROCESS_LAUNCHER = "gearpump.worker.executor-process-launcher" - val GEARPUMP_SCHEDULING_SCHEDULER = "gearpump.scheduling.scheduler-class" - val GEARPUMP_SCHEDULING_REQUEST = "gearpump.scheduling.requests" - val GEARPUMP_TRANSPORT_SERIALIZER = "gearpump.transport.serializer" - val GEARPUMP_SERIALIZER_POOL = "gearpump.serialization-framework" - val GEARPUMP_SERIALIZERS = "gearpump.serializers" - val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher" - val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters" - val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout" - val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = - "gearpump.worker.executor-share-same-jvm-as-worker" - - val GEARPUMP_HOME = "gearpump.home" - val GEARPUMP_FULL_SCALA_VERSION = "gearpump.binary-version-with-scala-version" - val GEARPUMP_HOSTNAME = "gearpump.hostname" - val GEARPUMP_APPMASTER_ARGS = "gearpump.appmaster.vmargs" - val GEARPUMP_APPMASTER_EXTRA_CLASSPATH = "gearpump.appmaster.extraClasspath" - val GEARPUMP_EXECUTOR_ARGS = "gearpump.executor.vmargs" - val GEARPUMP_EXECUTOR_EXTRA_CLASSPATH = "gearpump.executor.extraClasspath" - val GEARPUMP_LOG_DAEMON_DIR = "gearpump.log.daemon.dir" - val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir" - val HADOOP_CONF = "hadoopConf" - - // Id used to identity Master JVM process in low level resource manager like YARN. - // In YARN, it means the container Id. - val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = - "gearpump.master-resource-manager-container-id" - - // Id used to identity Worker JVM process in low level resource manager like YARN. - // In YARN, it means the container Id. - val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = - "gearpump.worker-resource-manager-container-id" - - // true or false - val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm" - val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port" - - // Whether to turn on GC log, true or false - val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc" - - // The time out for Future, like ask. - // !Important! This global timeout setting will also impact the UI - // responsive time if set to too big. Please make sure you have - // enough justification to change this global setting, otherwise - // please use your local timeout setting instead. - val FUTURE_TIMEOUT = akka.util.Timeout(15, TimeUnit.SECONDS) - - val GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS = "gearpump.start-executor-system-timeout-ms" - - val APPMASTER_DEFAULT_EXECUTOR_ID = -1 - - val NETTY_BUFFER_SIZE = "gearpump.netty.buffer-size" - val NETTY_MAX_RETRIES = "gearpump.netty.max-retries" - val NETTY_BASE_SLEEP_MS = "gearpump.netty.base-sleep-ms" - val NETTY_MAX_SLEEP_MS = "gearpump.netty.max-sleep-ms" - val NETTY_MESSAGE_BATCH_SIZE = "gearpump.netty.message-batch-size" - val NETTY_FLUSH_CHECK_INTERVAL = "gearpump.netty.flush-check-interval" - val NETTY_TCP_HOSTNAME = "akka.remote.netty.tcp.hostname" - val NETTY_DISPATCHER = "gearpump.netty.dispatcher" - - val GEARPUMP_USERNAME = "gearpump.username" - val GEARPUMP_APPLICATION_ID = "gearpump.applicationId" - val GEARPUMP_MASTER_STARTTIME = "gearpump.master.starttime" - val GEARPUMP_EXECUTOR_ID = "gearpump.executorId" - // Application jar property - val GEARPUMP_APP_JAR = "gearpump.app.jar" - val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix" - - // Where the jar is stored at. It can be a HDFS, or a local disk. - val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath" - - // Uses java property -Dgearpump.config.file=xxx.conf to set customized configuration - // Otherwise application.conf in classpath will be loaded - val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file" - - // Metrics related - val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled" - val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate" - val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms" - val GEARPUMP_METRIC_GRAPHITE_HOST = "gearpump.metrics.graphite.host" - val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port" - val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter" - - // Retains at max @RETAIN_HISTORY_HOURS history data - val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = "gearpump.metrics.retainHistoryData.hours" - - // Time interval between two history data points. - val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = "gearpump.metrics.retainHistoryData.intervalMs" - - // Retains at max @RETAIN_LATEST_SECONDS recent data points - val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = "gearpump.metrics.retainRecentData.seconds" - - // time interval between two recent data points. - val GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS = "gearpump.metrics.retainRecentData.intervalMs" - - // AppMaster will max wait this time until it declare the resource cannot be allocated, - // and shutdown itself - val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = "gearpump.resource-allocation-timeout-seconds" - - // Service related - val GEARPUMP_SERVICE_HTTP = "gearpump.services.http" - val GEARPUMP_SERVICE_HOST = "gearpump.services.host" - val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path" - val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise" - - // The partitioners provided by Gearpump - val BUILTIN_PARTITIONERS = Array( - classOf[BroadcastPartitioner], - classOf[CoLocationPartitioner], - classOf[HashPartitioner], - classOf[ShuffleGroupingPartitioner], - classOf[ShufflePartitioner]) - - // Security related - val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file" - val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal" - - val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query" - val GEARPUMP_METRICS_AGGREGATORS = "gearpump.metrics.akka.metrics-aggregator-class" - - val GEARPUMP_UI_SECURITY = "gearpump.ui-security" - val GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED = "gearpump.ui-security.authentication-enabled" - val GEARPUMP_UI_AUTHENTICATOR_CLASS = "gearpump.ui-security.authenticator" - // OAuth Authentication Factory for UI server. - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED = "gearpump.ui-security.oauth2-authenticator-enabled" - val GEARPUMP_UI_OAUTH2_AUTHENTICATORS = "gearpump.ui-security.oauth2-authenticators" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS = "class" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK = "callback" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID = "clientid" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET = "clientsecret" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE = "default-userrole" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE = "code" - val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN = "accesstoken" - - val PREFER_IPV4 = "java.net.preferIPv4Stack" - - val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num" - - val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration" -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/FileUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/FileUtils.scala b/core/src/main/scala/io/gearpump/util/FileUtils.scala deleted file mode 100644 index 1561587..0000000 --- a/core/src/main/scala/io/gearpump/util/FileUtils.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.{File, IOException} -import java.nio.charset.Charset - -import io.gearpump.google.common.io.Files - -object FileUtils { - private val UTF8 = Charset.forName("UTF-8") - - def write(file: File, str: String): Unit = { - Files.write(str, file, UTF8) - } - - def read(file: File): String = { - Files.asCharSource(file, UTF8).read() - } - - def writeByteArrayToFile(file: File, bytes: Array[Byte]): Unit = { - Files.write(bytes, file) - } - - def readFileToByteArray(file: File): Array[Byte] = { - Files.toByteArray(file) - } - - /** recursively making all parent directories including itself */ - def forceMkdir(directory: File): Unit = { - if (directory.exists() && directory.isFile) { - throw new IOException(s"Failed to create directory ${directory.toString}, it already exist") - } - Files.createParentDirs(directory) - val result = directory.mkdir() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Graph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Graph.scala b/core/src/main/scala/io/gearpump/util/Graph.scala deleted file mode 100644 index 8c34329..0000000 --- a/core/src/main/scala/io/gearpump/util/Graph.scala +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util -import scala.annotation.tailrec -import scala.collection.mutable -import scala.language.implicitConversions - -/** - * Generic mutable Graph libraries. - */ -class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable { - - private val _vertices = mutable.Set.empty[N] - private val _edges = mutable.Set.empty[(N, E, N)] - - // This is used to ensure the output of this Graph is always stable - // Like method vertices(), or edges() - private var _indexs = Map.empty[Any, Int] - private var _nextIndex = 0 - private def nextId: Int = { - val result = _nextIndex - _nextIndex += 1 - result - } - - private def init(): Unit = { - Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_)) - Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_)) - } - - init() - - /** - * Add a vertex - * Current Graph is changed. - */ - def addVertex(vertex: N): Unit = { - val result = _vertices.add(vertex) - if (result) { - _indexs += vertex -> nextId - } - } - - /** - * Add a edge - * Current Graph is changed. - */ - def addEdge(edge: (N, E, N)): Unit = { - val result = _edges.add(edge) - if (result) { - _indexs += edge -> nextId - } - } - - /** - * return all vertices. - * The result is stable - */ - def vertices: List[N] = { - // Sorts the vertex so that we can keep the order for mapVertex - _vertices.toList.sortBy(_indexs(_)) - } - - /** - * out degree - */ - def outDegreeOf(node: N): Int = { - edges.count(_._1 == node) - } - - /** - * in degree - */ - def inDegreeOf(node: N): Int = { - edges.count(_._3 == node) - } - - /** - * out going edges. - */ - def outgoingEdgesOf(node: N): List[(N, E, N)] = { - edges.filter(_._1 == node) - } - - /** - * incoming edges. - */ - def incomingEdgesOf(node: N): List[(N, E, N)] = { - edges.filter(_._3 == node) - } - - /** - * Remove vertex - * Current Graph is changed. - */ - def removeVertex(node: N): Unit = { - _vertices.remove(node) - _indexs -= node - val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node) - toBeRemoved.foreach(removeEdge(_)) - } - - /** - * Remove edge - * Current Graph is changed. - */ - private def removeEdge(edge: (N, E, N)): Unit = { - _indexs -= edge - _edges.remove(edge) - } - - /** - * add edge - * Current Graph is changed. - */ - def addEdge(node1: N, edge: E, node2: N): Unit = { - addVertex(node1) - addVertex(node2) - addEdge((node1, edge, node2)) - } - - /** - * Map a graph to a new graph, with vertex converted to a new type - * Current Graph is not changed. - */ - def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = { - val vertexes = vertices.map(node => (node, fun(node))) - - val vertexMap: Map[N, NewNode] = vertexes.toMap - - val newEdges = edges.map { edge => - (vertexMap(edge._1), edge._2, vertexMap(edge._3)) - } - new Graph(vertexes.map(_._2), newEdges) - } - - /** - * Map a graph to a new graph, with edge converted to new type - * Current graph is not changed. - */ - def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = { - val newEdges = edges.map { edge => - (edge._1, fun(edge._1, edge._2, edge._3), edge._3) - } - new Graph(vertices, newEdges) - } - - /** - * edges connected to node - */ - def edgesOf(node: N): List[(N, E, N)] = { - (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_)) - } - - /** - * all edges - */ - def edges: List[(N, E, N)] = { - _edges.toList.sortBy(_indexs(_)) - } - - /** - * Add another graph - * Current graph is changed. - */ - def addGraph(other: Graph[N, E]): Graph[N, E] = { - (vertices ++ other.vertices).foreach(addVertex(_)) - (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3)) - this - } - - /** - * clone the graph - */ - def copy: Graph[N, E] = { - new Graph(vertices, edges) - } - - /** - * check empty - */ - def isEmpty: Boolean = { - val vertexCount = vertices.size - val edgeCount = edges.length - if (vertexCount + edgeCount == 0) { - true - } else { - false - } - } - - /** - * sub-graph which contains current node and all neighbour - * nodes and edges. - * - */ - def subGraph(node: N): Graph[N, E] = { - val newGraph = Graph.empty[N, E] - for (edge <- edgesOf(node)) { - newGraph.addEdge(edge._1, edge._2, edge._3) - } - newGraph - } - - /** - * replace vertex, the current Graph is mutated. - */ - def replaceVertex(node: N, newNode: N): Graph[N, E] = { - for (edge <- incomingEdgesOf(node)) { - addEdge(edge._1, edge._2, newNode) - } - - for (edge <- outgoingEdgesOf(node)) { - addEdge(newNode, edge._2, edge._3) - } - removeVertex(node) - this - } - - private def removeZeroInDegree: List[N] = { - val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_)) - toBeRemoved.foreach(removeVertex(_)) - toBeRemoved - } - - /** - * Return an iterator of vertex in topological order - * The node returned by Iterator is stable sorted. - */ - def topologicalOrderIterator: Iterator[N] = { - val newGraph = copy - var output = List.empty[N] - - while (!newGraph.isEmpty) { - output ++= newGraph.removeZeroInDegree - } - output.iterator - } - - /** - * Return all circles in graph. - * - * The reference of this algorithm is: - * https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm - */ - private def findCircles: mutable.MutableList[mutable.MutableList[N]] = { - val inStack = mutable.Map.empty[N, Boolean] - val stack = mutable.Stack[N]() - val indexMap = mutable.Map.empty[N, Int] - val lowLink = mutable.Map.empty[N, Int] - var index = 0 - - val circles = mutable.MutableList.empty[mutable.MutableList[N]] - - def tarjan(node: N): Unit = { - indexMap(node) = index - lowLink(node) = index - index += 1 - inStack(node) = true - stack.push(node) - - outgoingEdgesOf(node).foreach { - edge => { - if (!indexMap.contains(edge._3)) { - tarjan(edge._3) - if (lowLink.get(edge._3).get < lowLink.get(node).get) { - lowLink(node) = lowLink(edge._3) - } - } else { - if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) { - lowLink(node) = indexMap(edge._3) - } - } - } - } - - if (indexMap.get(node).get == lowLink.get(node).get) { - val circle = mutable.MutableList.empty[N] - var n = node - do { - n = stack.pop() - inStack(n) = false - circle += n - } while (n != node) - circles += circle - } - } - - vertices.foreach { - node => { - if (!indexMap.contains(node)) tarjan(node) - } - } - - circles - } - - /** - * Return an iterator of vertex in topological order of graph with circles - * The node returned by Iterator is stable sorted. - * - * The reference of this algorithm is: - * http://www.drdobbs.com/database/topological-sorting/184410262 - */ - def topologicalOrderWithCirclesIterator: Iterator[N] = { - val circles = findCircles - val newGraph = Graph.empty[mutable.MutableList[N], E] - circles.foreach { - circle => { - newGraph.addVertex(circle) - } - } - - for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield { - for (node1 <- circle1; node2 <- circle2) yield { - var edges = outgoingEdgesOf(node1) - for (edge <- edges; if edge._3 == node2) yield { - newGraph.addEdge(circle1, edge._2, circle2) - } - - edges = outgoingEdgesOf(node2) - for (edge <- edges; if edge._3 == node1) yield { - newGraph.addEdge(circle2, edge._2, circle1) - } - } - } - - val topo = newGraph.topologicalOrderIterator - topo.flatMap(_.sortBy(_indexs(_)).iterator) - } - - /** - * check whether there is a loop - */ - def hasCycle(): Boolean = { - @tailrec - def detectCycle(graph: Graph[N, E]): Boolean = { - if (graph.edges.isEmpty) { - false - } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) { - true - } else { - graph.removeZeroInDegree - detectCycle(graph) - } - } - - detectCycle(copy) - } - - /** - * Check whether there are two edges connecting two nodes. - */ - def hasDuplicatedEdge(): Boolean = { - edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1) - } - - /** - * Generate a level map for each vertex withholding: - * {{{ - * if vertex A -> B, then level(A) -> level(B) - * }}} - */ - def vertexHierarchyLevelMap(): Map[N, Int] = { - val newGraph = copy - var output = Map.empty[N, Int] - var level = 0 - while (!newGraph.isEmpty) { - output ++= newGraph.removeZeroInDegree.map((_, level)).toMap - level += 1 - } - output - } - - override def toString: String = { - Map("vertices" -> vertices.mkString(","), - "edges" -> edges.mkString(",")).toString() - } -} - -object Graph { - - /** - * Example: - * - * {{{ - * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11) - * Will create a graph with: - * nodes: - * 1, 4, 7, 8, 55, 11 - * edge: - * 2: (1->4) - * 5: (4->7) - * 9: (8->55) - * }}} - */ - def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = { - val graph = empty[N, E] - elems.foreach { path => - path.updategraph(graph) - } - graph - } - - def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = { - new Graph(vertices, edges) - } - - def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = { - Some((graph.vertices, graph.edges)) - } - - def empty[N, E]: Graph[N, E] = { - new Graph(List.empty[N], List.empty[(N, E, N)]) - } - - class Path[N, + E](path: List[Either[N, E]]) { - - def ~[Edge >: E](edge: Edge): Path[N, Edge] = { - new Path(path :+ Right(edge)) - } - - def ~>[Node >: N](node: Node): Path[Node, E] = { - new Path(path :+ Left(node)) - } - - def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { - this ~ edge ~> node - } - - private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, Edge]): Unit = { - val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None) - path.foldLeft(nodeEdgePair) { (pair, either) => - val (lastNode, lastEdge) = pair - either match { - case Left(node) => - graph.addVertex(node) - if (lastNode.isDefined) { - graph.addEdge(lastNode.get, lastEdge.getOrElse(null.asInstanceOf[Edge]), node) - } - (Some(node), None) - case Right(edge) => - (lastNode, Some(edge)) - } - } - } - } - - object Path { - implicit def anyToPath[N, E](any: N): Path[N, E] = Node(any) - } - - implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) { - - override def ~[Edge](edge: Edge): Path[N, Edge] = { - new Path(List(Left(self), Right(edge))) - } - - override def ~>[Node >: N](node: Node): Path[Node, E] = { - new NodeList(List(self, node)) - } - - override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { - this ~ edge ~> node - } - } - - class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) { - override def ~[Edge](edge: Edge): Path[N, Edge] = { - new Path(nodes.map(Left(_)) :+ Right(edge)) - } - - override def ~>[Node >: N](node: Node): Path[Node, E] = { - new NodeList(nodes :+ node) - } - - override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { - this ~ edge ~> node - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala deleted file mode 100644 index 7552444..0000000 --- a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.util -import scala.collection.mutable.ListBuffer - -import akka.actor.Actor -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption} -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} -import io.gearpump.metrics.Metrics._ -import io.gearpump.metrics.MetricsAggregator -import io.gearpump.util.Constants._ -import io.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator} - -/** - * - * Metrics service to serve history metrics data - * - * For simplicity, HistoryMetricsService will maintain 72 hours coarse-grained data - * for last 72 hours, and fine-grained data for past 5 min. - * - * For the coarse-grained data of past 72 hours, one or two sample point will be stored - * for each hour. - * - * For fine-grained data in last 5 min, there will be 1 sample point per 15 seconds. - */ -class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends Actor { - private val LOG: Logger = LogUtil.getLogger(getClass, name = name) - private var metricsStore = Map.empty[String, HistoryMetricsStore] - private val systemConfig = context.system.settings.config - - def receive: Receive = metricHandler orElse commandHandler - def metricHandler: Receive = { - case ReportMetrics => - sender ! DemandMoreMetrics(self) - case metrics: MetricType => - val name = metrics.name - if (metricsStore.contains(name)) { - metricsStore(name).add(metrics) - } else { - val store = HistoryMetricsStore(name, metrics, config) - metricsStore += name -> store - store.add(metrics) - } - } - - private def toRegularExpression(input: String): String = { - "^" + input.flatMap { - case '*' => ".*" - case '?' => "." - case char if "()[]$^.{}|\\".contains(char) => "\\" + char - case other => s"$other" - } + ".*$" - } - - private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption) - : List[HistoryMetricsItem] = { - - val result = new ListBuffer[HistoryMetricsItem] - - val regex = toRegularExpression(pathPattern).r.pattern - - val iter = metricsStore.iterator - while (iter.hasNext) { - val (name, store) = iter.next() - - val matcher = regex.matcher(name) - if (matcher.matches()) { - readOption match { - case ReadOption.ReadLatest => - result.append(store.readLatest: _*) - case ReadOption.ReadRecent => - result.append(store.readRecent: _*) - case ReadOption.ReadHistory => - result.append(store.readHistory: _*) - case _ => - // Skip all other options. - } - } - } - result.toList - } - - val dummyAggregator = new DummyMetricsAggregator - private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, MetricsAggregator] - - import scala.collection.JavaConverters._ - private val validAggregators: Set[String] = { - val rootConfig = systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped - rootConfig.keySet().asScala.toSet - } - - def commandHandler: Receive = { - // Path accept syntax ? *, ? will match one char, * will match at least one char - case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) => - - val aggregator = { - if (aggregatorClazz == null || aggregatorClazz.isEmpty) { - dummyAggregator - } else if (aggregators.contains(aggregatorClazz)) { - aggregators(aggregatorClazz) - } else if (validAggregators.contains(aggregatorClazz)) { - val clazz = Class.forName(aggregatorClazz) - val constructor = clazz.getConstructor(classOf[Config]) - val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator] - aggregators += aggregatorClazz -> aggregator - aggregator - } else { - LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, " + - s"we will drop all messages. Please see config at ${GEARPUMP_METRICS_AGGREGATORS}") - val skipAll = new SkipAllAggregator - aggregators += aggregatorClazz -> new SkipAllAggregator - skipAll - } - } - - val metrics = fetchMetricsHistory(inputPath, readOption).iterator - sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, metrics)) - } -} - -object HistoryMetricsService { - - trait MetricsStore { - def add(inputMetrics: MetricType): Unit - - def read: List[HistoryMetricsItem] - - /** - * read latest inserted records - * @return - */ - def readLatest: List[HistoryMetricsItem] - } - - trait HistoryMetricsStore { - def add(inputMetrics: MetricType): Unit - - /** - * read latest inserted records - * @return - */ - def readLatest: List[HistoryMetricsItem] - - def readRecent: List[HistoryMetricsItem] - - def readHistory: List[HistoryMetricsItem] - } - - class DummyHistoryMetricsStore extends HistoryMetricsStore { - - val empty = List.empty[HistoryMetricsItem] - - override def add(inputMetrics: MetricType): Unit = Unit - - override def readRecent: List[HistoryMetricsItem] = empty - - /** - * read latest inserted records - * @return - */ - override def readLatest: List[HistoryMetricsItem] = empty - - override def readHistory: List[HistoryMetricsItem] = empty - } - - object HistoryMetricsStore { - def apply(name: String, metric: MetricType, config: HistoryMetricsConfig) - : HistoryMetricsStore = { - metric match { - case histogram: Histogram => new HistogramMetricsStore(config) - case meter: Meter => new MeterMetricsStore(config) - case counter: Counter => new CounterMetricsStore(config) - case gauge: Gauge => new GaugeMetricsStore(config) - case _ => new DummyHistoryMetricsStore // other metrics are not supported - } - } - } - - /** - * Metrics store to store history data points - * For each time point, we will store single data point. - * - * @param retainCount how many data points to retain, old data will be removed - * @param retainIntervalMs time interval between two data points. - */ - class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) extends MetricsStore { - - private val queue = new util.ArrayDeque[HistoryMetricsItem]() - private var latest = List.empty[HistoryMetricsItem] - - // End of the time window we are tracking - private var endTime = 0L - - override def add(inputMetrics: MetricType): Unit = { - add(inputMetrics, System.currentTimeMillis()) - } - - def add(inputMetrics: MetricType, now: TimeStamp): Unit = { - - val metrics = HistoryMetricsItem(now, inputMetrics) - latest = List(metrics) - - if (now >= endTime) { - queue.addFirst(metrics) - endTime = (now / retainIntervalMs + 1) * retainIntervalMs - - // Removes old data - if (queue.size() > retainCount) { - queue.removeLast() - } - } - } - - def read: List[HistoryMetricsItem] = { - val result = new ListBuffer[HistoryMetricsItem] - import scala.collection.JavaConverters._ - queue.iterator().asScala.foreach(result.prepend(_)) - result.toList - } - - override def readLatest: List[HistoryMetricsItem] = { - latest - } - } - - /** - * Config for how long to keep history metrics data. - * - * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history data(unit hour) - * @param retainHistoryDataIntervalMs time interval between two history data points.(unit: ms) - * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS - * recent data points(unit: seconds) - * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent - * data points(unit: ms) - */ - case class HistoryMetricsConfig( - retainHistoryDataHours: Int, - retainHistoryDataIntervalMs: Int, - retainRecentDataSeconds: Int, - retainRecentDataIntervalMs: Int) - - object HistoryMetricsConfig { - def apply(config: Config): HistoryMetricsConfig = { - val historyHour = config.getInt(GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS) - val historyInterval = config.getInt(GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS) - - val recentSeconds = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_SECONDS) - val recentInterval = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS) - HistoryMetricsConfig(historyHour, historyInterval, recentSeconds, recentInterval) - } - } - - class HistogramMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { - - private val history = new SingleValueMetricsStore( - config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, - config.retainHistoryDataIntervalMs) - - private val recent = new SingleValueMetricsStore( - config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, - config.retainRecentDataIntervalMs) - - override def add(inputMetrics: MetricType): Unit = { - recent.add(inputMetrics) - history.add(inputMetrics) - } - - override def readRecent: List[HistoryMetricsItem] = { - recent.read - } - - override def readHistory: List[HistoryMetricsItem] = { - history.read - } - - override def readLatest: List[HistoryMetricsItem] = { - recent.readLatest - } - } - - class MeterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { - - private val history = new SingleValueMetricsStore( - config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, - config.retainHistoryDataIntervalMs) - - private val recent = new SingleValueMetricsStore( - config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, - config.retainRecentDataIntervalMs) - - override def add(inputMetrics: MetricType): Unit = { - recent.add(inputMetrics) - history.add(inputMetrics) - } - - override def readRecent: List[HistoryMetricsItem] = { - recent.read - } - - override def readHistory: List[HistoryMetricsItem] = { - history.read - } - - override def readLatest: List[HistoryMetricsItem] = { - recent.readLatest - } - } - - class CounterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { - - private val history = new SingleValueMetricsStore( - config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, - config.retainHistoryDataIntervalMs) - - private val recent = new SingleValueMetricsStore( - config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, - config.retainRecentDataIntervalMs) - - override def add(inputMetrics: MetricType): Unit = { - history.add(inputMetrics) - recent.add(inputMetrics) - } - - override def readRecent: List[HistoryMetricsItem] = { - recent.read - } - - override def readHistory: List[HistoryMetricsItem] = { - history.read - } - - override def readLatest: List[HistoryMetricsItem] = { - recent.readLatest - } - } - - class GaugeMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { - - private val compartor = (left: HistoryMetricsItem, right: HistoryMetricsItem) => - left.value.asInstanceOf[Gauge].value > right.value.asInstanceOf[Gauge].value - - private val history = new SingleValueMetricsStore( - config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, - config.retainHistoryDataIntervalMs) - - private val recent = new SingleValueMetricsStore( - config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, - config.retainRecentDataIntervalMs) - - override def add(inputMetrics: MetricType): Unit = { - recent.add(inputMetrics) - history.add(inputMetrics) - } - - override def readRecent: List[HistoryMetricsItem] = { - recent.read - } - - override def readHistory: List[HistoryMetricsItem] = { - history.read - } - - override def readLatest: List[HistoryMetricsItem] = { - recent.readLatest - } - } - - class DummyMetricsAggregator extends MetricsAggregator { - def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) - : List[HistoryMetricsItem] = { - inputs.toList - } - } - - class SkipAllAggregator extends MetricsAggregator { - private val empty = List.empty[HistoryMetricsItem] - def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) - : List[HistoryMetricsItem] = { - empty - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/LogUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/LogUtil.scala b/core/src/main/scala/io/gearpump/util/LogUtil.scala deleted file mode 100644 index 1669129..0000000 --- a/core/src/main/scala/io/gearpump/util/LogUtil.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.File -import java.net.InetAddress -import java.util.Properties -import scala.util.Try - -import com.typesafe.config.Config -import org.apache.log4j.PropertyConfigurator -import org.slf4j.{Logger, LoggerFactory} - -object LogUtil { - object ProcessType extends Enumeration { - type ProcessType = Value - val MASTER, WORKER, LOCAL, APPLICATION, UI = Value - } - - def getLogger[T]( - clazz: Class[T], context: String = null, master: Any = null, worker: Any = null, - executor: Any = null, task: Any = null, app: Any = null, name: String = null): Logger = { - var env = "" - - if (null != context) { - env += context - } - if (null != master) { - env += "master" + master - } - if (null != worker) { - env += "worker" + worker - } - - if (null != app) { - env += "app" + app - } - - if (null != executor) { - env += "exec" + executor - } - if (null != task) { - env += task - } - if (null != name) { - env += name - } - - if (!env.isEmpty) { - LoggerFactory.getLogger(clazz.getSimpleName + "@" + env) - } else { - LoggerFactory.getLogger(clazz.getSimpleName) - } - } - - /** Custom the log file locations by reading config from system properties */ - def loadConfiguration(config: Config, processType: ProcessType.ProcessType): Unit = { - // Set log file name - val propName = s"gearpump.${processType.toString.toLowerCase}.log.file" - val props = loadConfiguration - - props.setProperty("gearpump.log.file", "${" + propName + "}") - - props.setProperty("JVM_NAME", jvmName) - - processType match { - case ProcessType.APPLICATION => - props.setProperty("log4j.rootAppender", "${gearpump.application.logger}") - props.setProperty("gearpump.application.log.rootdir", - applicationLogDir(config).getAbsolutePath) - case _ => - props.setProperty("log4j.rootAppender", "${gearpump.root.logger}") - props.setProperty("gearpump.log.dir", daemonLogDir(config).getAbsolutePath) - } - - PropertyConfigurator.configure(props) - } - - def daemonLogDir(config: Config): File = { - val dir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR) - new File(dir) - } - - def verboseLogToConsole(): Unit = { - val props = loadConfiguration - props.setProperty("log4j.rootLogger", "DEBUG,console") - PropertyConfigurator.configure(props) - } - - def loadConfiguration: Properties = { - val props = new Properties() - val log4jConfStream = getClass().getClassLoader.getResourceAsStream("log4j.properties") - if (log4jConfStream != null) { - props.load(log4jConfStream) - } - log4jConfStream.close() - props - } - - private def jvmName: String = { - val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local") - java.lang.management.ManagementFactory.getRuntimeMXBean().getName() - } - - def applicationLogDir(config: Config): File = { - val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR) - new File(appLogDir) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala deleted file mode 100644 index 0b843f3..0000000 --- a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.{Closeable, Flushable} -import scala.sys.process.ProcessLogger - -import org.slf4j.LoggerFactory - -/** Redirect the console output to parent process */ -class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable with ConsoleOutput { - private val LOG = LoggerFactory.getLogger("redirect") - - // We only capture the first 1K chars - private final val LENGTH = 1000 - private var _error: String = "" - private var _output: String = "" - - def error: String = _error - def output: String = _output - - def out(s: => String): Unit = { - if (_output.length <= LENGTH) { - _output += "\n" + s - } - LOG.info(s) - } - def err(s: => String): Unit = { - if (_error.length <= LENGTH) { - _error += "\n" + s - } - LOG.error(s) - } - def buffer[T](f: => T): T = f - def close(): Unit = Unit - def flush(): Unit = Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala deleted file mode 100644 index f6c7a2b..0000000 --- a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -/** - * Check equal using reference-equal. - */ -trait ReferenceEqual extends AnyRef { - - override def equals(other: Any): Boolean = { - this.eq(other.asInstanceOf[AnyRef]) - } - - override def hashCode(): Int = { - super.hashCode() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/RestartPolicy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala deleted file mode 100644 index 245cb1b..0000000 --- a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import scala.concurrent.duration.Duration - -import akka.actor.ChildRestartStats - -/** - * When one executor or task fails, Gearpump will try to start. However, if it fails after - * multiple retries, then we abort. - * - * @param maxNrOfRetries The number of times is allowed to be restarted, negative value means no - * limit, if the limit is exceeded the policy will not allow to restart - * @param withinTimeRange Duration of the time window for maxNrOfRetries. - * Duration.Inf means no window - */ -class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { - private val status = new ChildRestartStats(null, 0, 0L) - private val retriesWindow = (Some(maxNrOfRetries), Some(withinTimeRange.toMillis.toInt)) - - def allowRestart: Boolean = { - status.requestRestartPermission(retriesWindow) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/RichProcess.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/RichProcess.scala b/core/src/main/scala/io/gearpump/util/RichProcess.scala deleted file mode 100644 index ab5611f..0000000 --- a/core/src/main/scala/io/gearpump/util/RichProcess.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import scala.sys.process.Process - -trait ConsoleOutput { - def output: String - def error: String -} - -/** Extends Process by providing a additional logger: ConsoleOutput interface. */ -class RichProcess(process: Process, _logger: ConsoleOutput) extends Process { - def exitValue(): scala.Int = process.exitValue() - def destroy(): scala.Unit = process.destroy() - def logger: ConsoleOutput = _logger -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala deleted file mode 100644 index 64b920c..0000000 --- a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration._ - -import akka.actor.{Actor, ActorRef} -import akka.pattern.ask - -/** A helper util to send a message to remote actor and notify callback when timeout */ -trait TimeOutScheduler { - this: Actor => - import context.dispatcher - - def sendMsgWithTimeOutCallBack( - target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => Unit): Unit = { - val result = target.ask(msg)(FiniteDuration(milliSeconds, TimeUnit.MILLISECONDS)) - result onSuccess { - case msg => - self ! msg - } - result onFailure { - case _ => timeOutHandler - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Util.scala b/core/src/main/scala/io/gearpump/util/Util.scala deleted file mode 100644 index 8ed9bb3..0000000 --- a/core/src/main/scala/io/gearpump/util/Util.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} -import java.net.{ServerSocket, URI} -import scala.concurrent.forkjoin.ThreadLocalRandom -import scala.sys.process.Process -import scala.util.{Failure, Success, Try} - -import com.typesafe.config.{Config, ConfigFactory} - -import io.gearpump.cluster.AppJar -import io.gearpump.jarstore.JarStoreService -import io.gearpump.transport.HostPort - -object Util { - val LOG = LogUtil.getLogger(getClass) - private val defaultUri = new URI("file:///") - private val appNamePattern = "^[a-zA-Z_][a-zA-Z0-9_]+$".r.pattern - - def validApplicationName(appName: String): Boolean = { - appNamePattern.matcher(appName).matches() - } - - def getCurrentClassPath: Array[String] = { - val classpath = System.getProperty("java.class.path") - val classpathList = classpath.split(File.pathSeparator) - classpathList - } - - def version: String = { - val home = System.getProperty(Constants.GEARPUMP_HOME) - val version = Try { - val versionFile = new FileInputStream(new File(home, "VERSION")) - val reader = new BufferedReader(new InputStreamReader(versionFile)) - val version = reader.readLine().replace("version:=", "") - versionFile.close() - version - } - version match { - case Success(version) => - version - case Failure(ex) => - LOG.error("failed to read VERSION file, " + ex.getMessage) - "Unknown-Version" - } - } - - def startProcess(options: Array[String], classPath: Array[String], mainClass: String, - arguments: Array[String]): RichProcess = { - val java = System.getProperty("java.home") + "/bin/java" - val command = List(java) ++ options ++ - List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments - LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " + - s"\n ${options.mkString(" ")}") - val logger = new ProcessLogRedirector() - val process = Process(command).run(logger) - new RichProcess(process, logger) - } - - /** - * hostList format: host1:port1,host2:port2,host3:port3... - */ - def parseHostList(hostList: String): List[HostPort] = { - val masters = hostList.trim.split(",").map { address => - val hostAndPort = address.split(":") - HostPort(hostAndPort(0), hostAndPort(1).toInt) - } - masters.toList - } - - def resolvePath(path: String): String = { - val uri = new URI(path) - if (uri.getScheme == null && uri.getFragment == null) { - val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", "/") - "file://" + absolutePath - } else { - path - } - } - - def isLocalPath(path: String): Boolean = { - val uri = new URI(path) - val scheme = uri.getScheme - val authority = uri.getAuthority - if (scheme == null && authority == null) { - true - } else if (scheme == defaultUri.getScheme) { - true - } else { - false - } - } - - def randInt(): Int = { - Math.abs(ThreadLocalRandom.current.nextInt()) - } - - def findFreePort(): Try[Int] = { - Try { - val socket = new ServerSocket(0) - socket.setReuseAddress(true) - val port = socket.getLocalPort() - socket.close - port - } - } - - def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = { - val remotePath = jarStoreService.copyFromLocal(jarFile) - AppJar(jarFile.getName, remotePath) - } - - /** - * This util can be used to filter out configuration from specific origin - * - * For example, if you want to filter out configuration from reference.conf - * Then you can use like this: - * - * filterOutOrigin(config, "reference.conf") - */ - import scala.collection.JavaConverters._ - def filterOutOrigin(config: Config, originFile: String): Config = { - config.entrySet().asScala.foldLeft(ConfigFactory.empty()) { (config, entry) => - val key = entry.getKey - val value = entry.getValue - val origin = value.origin() - if (origin.resource() == originFile) { - config - } else { - config.withValue(key, value) - } - } - } - - case class JvmSetting(vmargs: Array[String], classPath: Array[String]) - - case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting) - - /** Get an effective AppJvmSettings from Config */ - def resolveJvmSetting(conf: Config): AppJvmSettings = { - - import io.gearpump.util.Constants._ - - val appMasterVMArgs = Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+") - .filter(_.nonEmpty)).toOption - val executorVMArgs = Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+") - .filter(_.nonEmpty)).toOption - - val appMasterClassPath = Try( - conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) - .split("[;:]").filter(_.nonEmpty)).toOption - - val executorClassPath = Try( - conf.getString(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) - .split(File.pathSeparator).filter(_.nonEmpty)).toOption - - AppJvmSettings( - JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]), - appMasterClassPath.getOrElse(Array.empty[String])), - JvmSetting(executorVMArgs - .getOrElse(Array.empty[String]), executorClassPath.getOrElse(Array.empty[String]))) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala new file mode 100644 index 0000000..871ebe1 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/Message.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump + +/** + * Each message contains a immutable timestamp. + * + * For example, if you take a picture, the time you take the picture is the + * message's timestamp. + * @param msg Accept any type except Null, Nothing and Unit + */ +case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp) + +object Message { + val noTimeStamp: TimeStamp = 0L +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala new file mode 100644 index 0000000..91c2675 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import scala.reflect.ClassTag + +import akka.actor.{Actor, ActorRef, ActorSystem} +import com.typesafe.config.{Config, ConfigFactory} + +import org.apache.gearpump.cluster.appmaster.WorkerInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.jarstore.FilePath + +/** + * This contains all information to run an application + * + * @param name The name of this application + * @param appMaster The class name of AppMaster Actor + * @param userConfig user configuration. + * @param clusterConfig User provided cluster config, it overrides gear.conf when starting + * new applications. In most cases, you should not need to change it. If you do + * really need to change it, please use ClusterConfigSource(filePath) to + * construct the object, while filePath points to the .conf file. + */ +case class AppDescription( + name: String, appMaster: String, userConfig: UserConfig, + clusterConfig: Config = ConfigFactory.empty()) + +/** + * Each job, streaming or not streaming, need to provide an Application class. + * The master uses this class to start AppMaster. + */ +trait Application { + + /** Name of this application, must be unique in the system */ + def name: String + + /** Custom user configuration */ + def userConfig(implicit system: ActorSystem): UserConfig + + /** + * AppMaster class, must have a constructor like this: + * this(appContext: AppMasterContext, app: AppDescription) + */ + def appMaster: Class[_ <: ApplicationMaster] +} + +object Application { + def apply[T <: ApplicationMaster]( + name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = { + new DefaultApplication(name, userConfig, + tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]]) + } + + class DefaultApplication( + override val name: String, inputUserConfig: UserConfig, + val appMaster: Class[_ <: ApplicationMaster]) extends Application { + override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig + } + + def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem) + : AppDescription = { + val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config) + AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys) + } +} + +/** + * Used for verification. All AppMaster must extend this interface + */ +abstract class ApplicationMaster extends Actor + +/** + * This contains context information when starting an AppMaster + * + * @param appId application instance id assigned, it is unique in the cluster + * @param username The username who submitted this application + * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to + * request more resource from Master. + * @param appJar application Jar. If the jar is already in classpath, then it can be None. + * @param masterProxy The proxy to master actor, it bridges the messages between appmaster + * and master + * @param registerData AppMaster are required to send this data to Master by when doing + * RegisterAppMaster. + */ +case class AppMasterContext( + appId: Int, + username: String, + resource: Resource, + workerInfo: WorkerInfo, + appJar: Option[AppJar], + masterProxy: ActorRef, + registerData: AppMasterRegisterData) + +/** + * Jar file container in the cluster + * + * @param name A meaningful name to represent this jar + * @param filePath Where the jar file is stored. + */ +case class AppJar(name: String, filePath: FilePath) + +/** + * Serves as the context to start an Executor JVM. + */ +// TODO: ExecutorContext doesn't belong to this package in logic. +case class ExecutorContext( + executorId: Int, worker: WorkerInfo, appId: Int, appName: String, + appMaster: ActorRef, resource: Resource) + +/** + * JVM configurations to start an Executor JVM. + * + * @param classPath When executor is created by a worker JVM, executor automatically inherits + * parent worker's classpath. Sometimes, you still want to add some extra + * classpath, you can do this by specify classPath option. + * @param jvmArguments java arguments like -Dxx=yy + * @param mainClass Executor main class name like org.apache.gearpump.xx.AppMaster + * @param arguments Executor command line arguments + * @param jar application jar + * @param executorAkkaConfig Akka config used to initialize the actor system of this executor. + * It uses org.apache.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE + * to pass the config to executor process + */ +// TODO: ExecutorContext doesn't belong to this package in logic. +case class ExecutorJVMConfig( + classPath: Array[String], jvmArguments: Array[String], mainClass: String, + arguments: Array[String], jar: Option[AppJar], username: String, + executorAkkaConfig: Config = ConfigFactory.empty()) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala new file mode 100644 index 0000000..332e770 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import java.io.File + +import com.typesafe.config._ + +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{Constants, FileUtils, LogUtil, Util} + +/** + * + * All Gearpump application should use this class to load configurations. + * + * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also + * resolve config from file gear.conf and geardefault.conf. + * + * Overriding order: + * {{{ + * System Properties + * > Custom configuration file (by using system property -Dgearpump.config.file) > + * > gear.conf + * > geardefault.conf + * > reference.conf + * }}} + */ + +object ClusterConfig { + /** + * alias for default + * default is a reserved word for java + */ + def defaultConfig: Config = { + default(APPLICATION) + } + + /** + * default application for user. + * Usually used when user want to start an client application. + */ + def default(configFile: String = APPLICATION): Config = { + load(configFile).default + } + + /** + * configuration for master node + */ + def master(configFile: String = null): Config = { + load(configFile).master + } + + /* + * configuration for worker node + */ + def worker(configFile: String = null): Config = { + load(configFile).worker + } + + /** + * configuration for UI server + */ + def ui(configFile: String = null): Config = { + load(configFile).ui + } + + /** + * try to load system property gearpump.config.file, or use configFile + */ + private def load(configFile: String): Configs = { + val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE)) + file match { + case Some(path) => + LOG.info("loading config file " + path + "..........") + load(ClusterConfigSource(path)) + case None => + LOG.info("loading config file application.conf...") + load(ClusterConfigSource(configFile)) + } + } + + val APPLICATION = "application.conf" + val LOG = LogUtil.getLogger(getClass) + + def saveConfig(conf: Config, file: File): Unit = { + val serialized = conf.root().render() + FileUtils.write(file, serialized) + } + + def render(config: Config, concise: Boolean = false): String = { + if (concise) { + config.root().render(ConfigRenderOptions.concise().setFormatted(true)) + } else { + config.root().render(ConfigRenderOptions.defaults()) + } + } + + /** filter JVM reserved keys and akka default reference.conf */ + def filterOutDefaultConfig(input: Config): Config = { + val updated = filterOutJvmReservedKeys(input) + Util.filterOutOrigin(updated, "reference.conf") + } + + private[gearpump] def load(source: ClusterConfigSource): Configs = { + + val systemProperties = getSystemProperties + + val user = source.getConfig + + val gear = ConfigFactory.parseResourcesAnySyntax("gear.conf", + ConfigParseOptions.defaults.setAllowMissing(true)) + + val gearDefault = ConfigFactory.parseResourcesAnySyntax("geardefault.conf", + ConfigParseOptions.defaults.setAllowMissing(true)) + + val all = systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault) + + val linux = all.getConfig(LINUX_CONFIG) + + var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG). + withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG) + + if (!akka.util.Helpers.isWindows) { + + // Change the akka.scheduler.tick-duration to 1 ms for Linux or Mac + basic = linux.withFallback(basic) + } + + val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic)) + val worker = replaceHost(all.getConfig(WORKER_CONFIG).withFallback(basic)) + val ui = replaceHost(all.getConfig(UI_CONFIG).withFallback(basic)) + val app = replaceHost(basic) + + new Configs(master, worker, ui, app) + } + + private def replaceHost(config: Config): Config = { + val hostName = config.getString(Constants.GEARPUMP_HOSTNAME) + config.withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(hostName)) + } + + val JVM_RESERVED_PROPERTIES = List( + "os", "java", "sun", "boot", "user", "prog", "path", "line", "awt", "file" + ) + + private def getSystemProperties: Config = { + // Excludes default java system properties + JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) => + config.withoutPath(property) + } + } + + class ConfigValidationException(msg: String) extends Exception(msg: String) + + private def filterOutJvmReservedKeys(input: Config): Config = { + val filterJvmReservedKeys = JVM_RESERVED_PROPERTIES.foldLeft(input) { (config, key) => + config.withoutPath(key) + } + filterJvmReservedKeys + } + + protected class Configs( + val master: Config, val worker: Config, val ui: Config, val default: Config) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala new file mode 100644 index 0000000..920ceae --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import java.io.File +import scala.language.implicitConversions + +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} + +/** + * Data Source of ClusterConfig + * + * Please use ClusterConfigSource.apply(filePath) to construct this object + */ +sealed trait ClusterConfigSource extends Serializable { + def getConfig: Config +} + +object ClusterConfigSource { + + /** + * Construct ClusterConfigSource from resource name or file path + */ + def apply(filePath: String): ClusterConfigSource = { + + if (null == filePath) { + new ClusterConfigSourceImpl(ConfigFactory.empty()) + } else { + var config = ConfigFactory.parseFileAnySyntax(new File(filePath), + ConfigParseOptions.defaults.setAllowMissing(true)) + + if (null == config || config.isEmpty) { + config = ConfigFactory.parseResourcesAnySyntax(filePath, + ConfigParseOptions.defaults.setAllowMissing(true)) + } + new ClusterConfigSourceImpl(config) + } + } + + implicit def FilePathToClusterConfigSource(filePath: String): ClusterConfigSource = { + apply(filePath) + } + + private class ClusterConfigSourceImpl(config: Config) extends ClusterConfigSource { + override def getConfig: Config = config + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala new file mode 100644 index 0000000..e6ab13b --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import org.apache.gearpump.cluster.worker.{WorkerSummary, WorkerId} + +import scala.util.Try + +import akka.actor.ActorRef +import com.typesafe.config.Config + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus +import org.apache.gearpump.cluster.master.{MasterNode, MasterSummary} +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.metrics.Metrics.MetricType + +object ClientToMaster { + case object AddMaster + case class AddWorker(count: Int) + case class RemoveMaster(masterContainerId: String) + case class RemoveWorker(workerContainerId: String) + + /** Command result of AddMaster, RemoveMaster, and etc... */ + case class CommandResult(success: Boolean, exception: String = null) { + override def toString: String = { + val tag = getClass.getSimpleName + if (success) { + s"$tag(success)" + } else { + s"$tag(failure, $exception)" + } + } + } + + /** Submit an application to master */ + case class SubmitApplication( + appDescription: AppDescription, appJar: Option[AppJar], + username: String = System.getProperty("user.name")) + + case class RestartApplication(appId: Int) + case class ShutdownApplication(appId: Int) + + /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */ + case class ResolveAppId(appId: Int) + + /** Client send ResolveWorkerId to master to get the Actor path of worker. */ + case class ResolveWorkerId(workerId: WorkerId) + + /** Get an active Jar store to upload job jars, like wordcount.jar */ + case object GetJarStoreServer + + /** Service address of JarStore */ + case class JarStoreServerAddress(url: String) + + /** Query AppMaster config by providing appId */ + case class QueryAppMasterConfig(appId: Int) + + /** Query worker config */ + case class QueryWorkerConfig(workerId: WorkerId) + + /** Query master config */ + case object QueryMasterConfig + + /** Options for read the metrics from the cluster */ + object ReadOption { + type ReadOption = String + + val Key: String = "readOption" + + /** Read the latest record of the metrics, only return 1 record for one metric name (id) */ + val ReadLatest: ReadOption = "readLatest" + + /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */ + val ReadRecent = "readRecent" + + /** + * Read the history metrics, typically it contains metrics for 48 hours + * + * NOTE: Each hour only contain one or two data points. + */ + val ReadHistory = "readHistory" + } + + /** Query history metrics from master or app master. */ + case class QueryHistoryMetrics( + path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest, + aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String]) + + /** + * If there are message loss, the clock would pause for a while. This message is used to + * pin-point which task has stalling clock value, and usually it means something wrong on + * that machine. + */ + case class GetStallingTasks(appId: Int) + + /** + * Request app master for a short list of cluster app that administrators should be aware of. + */ + case class GetLastFailure(appId: Int) +} + +object MasterToClient { + + /** Result of SubmitApplication */ + // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception) + case class SubmitApplicationResult(appId: Try[Int]) + + case class SubmitApplicationResultValue(appId: Int) + + case class ShutdownApplicationResult(appId: Try[Int]) + case class ReplayApplicationResult(appId: Try[Int]) + + /** Return Actor ref of app master */ + case class ResolveAppIdResult(appMaster: Try[ActorRef]) + + /** Return Actor ref of worker */ + case class ResolveWorkerIdResult(worker: Try[ActorRef]) + + case class AppMasterConfig(config: Config) + + case class WorkerConfig(config: Config) + + case class MasterConfig(config: Config) + + case class HistoryMetricsItem(time: TimeStamp, value: MetricType) + + /** + * History metrics returned from master, worker, or app master. + * + * All metric items are organized like a tree, path is used to navigate through the tree. + * For example, when querying with path == "executor0.task1.throughput*", the metrics + * provider picks metrics whose source matches the path. + * + * @param path The path client provided. The returned metrics are the result query of this path. + * @param metrics The detailed metrics. + */ + case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem]) + + /** Return the last error of this streaming application job */ + case class LastFailure(time: TimeStamp, error: String) +} + +trait AppMasterRegisterData + +object AppMasterToMaster { + + /** + * Register an AppMaster by providing a ActorRef, and registerData + * @param registerData The registerData is provided by Master when starting the app master. + * App master should return the registerData back to master. + * Typically registerData hold some context information for this app Master. + */ + + case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData) + + case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable) + + case class RequestResource(appId: Int, request: ResourceRequest) + + /** + * Each application job can save some data in the distributed cluster storage on master nodes. + * + * @param appId App Id of the client application who send the request. + * @param key Key name + * @param value Value to store on distributed cluster storage on master nodes + */ + case class SaveAppData(appId: Int, key: String, value: Any) + + /** The application specific data is successfully stored */ + case object AppDataSaved + + /** Fail to store the application data */ + case object SaveAppDataFailed + + /** Fetch the application specific data that stored previously */ + case class GetAppData(appId: Int, key: String) + + /** The KV data returned for query GetAppData */ + case class GetAppDataResult(key: String, value: Any) + + /** + * AppMasterSummary returned to REST API query. Streaming and Non-streaming + * have very different application info. AppMasterSummary is the common interface. + */ + trait AppMasterSummary { + def appType: String + def appId: Int + def appName: String + def actorPath: String + def status: AppMasterStatus + def startTime: TimeStamp + def uptime: TimeStamp + def user: String + } + + /** Represents a generic application that is not a streaming job */ + case class GeneralAppMasterSummary( + appId: Int, + appType: String = "general", + appName: String = null, + actorPath: String = null, + status: AppMasterStatus = MasterToAppMaster.AppMasterActive, + startTime: TimeStamp = 0L, + uptime: TimeStamp = 0L, + user: String = null) + extends AppMasterSummary + + /** Fetches the list of workers from Master */ + case object GetAllWorkers + + /** Get worker data of workerId */ + case class GetWorkerData(workerId: WorkerId) + + /** Response to GetWorkerData */ + case class WorkerData(workerDescription: WorkerSummary) + + /** Get Master data */ + case object GetMasterData + + /** Response to GetMasterData */ + case class MasterData(masterDescription: MasterSummary) +} + +object MasterToAppMaster { + + /** Resource allocated for application xx */ + case class ResourceAllocated(allocations: Array[ResourceAllocation]) + + /** Master confirm reception of RegisterAppMaster message */ + case class AppMasterRegistered(appId: Int) + + /** Shutdown the application job */ + case object ShutdownAppMaster + + type AppMasterStatus = String + val AppMasterActive: AppMasterStatus = "active" + val AppMasterInActive: AppMasterStatus = "inactive" + val AppMasterNonExist: AppMasterStatus = "nonexist" + + sealed trait StreamingType + case class AppMasterData( + status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, + workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, + finishTime: TimeStamp = 0, user: String = null) + + case class AppMasterDataRequest(appId: Int, detail: Boolean = false) + + case class AppMastersData(appMasters: List[AppMasterData]) + case object AppMastersDataRequest + case class AppMasterDataDetailRequest(appId: Int) + case class AppMasterMetricsRequest(appId: Int) extends StreamingType + + case class ReplayFromTimestampWindowTrailingEdge(appId: Int) + + case class WorkerList(workers: List[WorkerId]) +} + +object AppMasterToWorker { + case class LaunchExecutor( + appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig) + + case class ShutdownExecutor(appId: Int, executorId: Int, reason: String) + case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource) +} + +object WorkerToAppMaster { + case class ExecutorLaunchRejected(reason: String = null, ex: Throwable = null) + case class ShutdownExecutorSucceed(appId: Int, executorId: Int) + case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null) +} +
