This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b99af9  [New Scheduler] Initial commit for the scheduler component 
(#4983)
7b99af9 is described below

commit 7b99af975eb77fa00ac71ecf3f0c27e74a3ca8b4
Author: Dominic Kim <style9...@gmail.com>
AuthorDate: Wed Nov 11 17:50:27 2020 +0900

    [New Scheduler] Initial commit for the scheduler component (#4983)
    
    * Initial commit for the scheduler component
    
    * Add a license header
    
    * Apply comments.
    
    * Move configuration checkups to above.
    
    * Add supplementary comments
---
 .../apache/openwhisk/common/TransactionId.scala    |   1 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   8 +
 .../apache/openwhisk/core/connector/Message.scala  |  30 ++
 .../apache/openwhisk/core/entity/InstanceId.scala  |  15 +
 core/scheduler/Dockerfile                          |  33 +++
 core/scheduler/build.gradle                        |  58 ++++
 core/scheduler/init.sh                             |  24 ++
 .../openwhisk/core/scheduler/Scheduler.scala       | 305 +++++++++++++++++++++
 .../openwhisk/core/scheduler/SchedulerServer.scala |  73 +++++
 settings.gradle                                    |   1 +
 10 files changed, 548 insertions(+)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index 257dd07..e8e58b6 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -232,6 +232,7 @@ object TransactionId {
 
   val systemPrefix = "sid_"
 
+  var containerCreation = TransactionId(systemPrefix + "containerCreation")
   val unknown = TransactionId(systemPrefix + "unknown")
   val testing = TransactionId(systemPrefix + "testing") // Common id for for 
unit testing
   val invoker = TransactionId(systemPrefix + "invoker") // Invoker 
startup/shutdown or GC activity
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index a050ce7..7c0ec2b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -85,6 +85,10 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit)
   val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
   val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
+
+  val schedulerHost = this(WhiskConfig.schedulerHost)
+  val schedulerRpcPort = this(WhiskConfig.schedulerRpcPort)
+  val schedulerAkkaPort = this(WhiskConfig.schedulerAkkaPort)
 }
 
 object WhiskConfig {
@@ -190,6 +194,10 @@ object WhiskConfig {
   val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
   val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
   val controllerSeedNodes = "akka.cluster.seed.nodes"
+
+  val schedulerHost = "whisk.scheduler.endpoints.host"
+  val schedulerRpcPort = "whisk.scheduler.endpoints.rpcPort"
+  val schedulerAkkaPort = "whisk.scheduler.endpoints.akkaPort"
 }
 
 object ConfigKeys {
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index c86426d..6052258 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -427,3 +427,33 @@ object EventMessage extends DefaultJsonProtocol {
 
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
+
+/**
+ * This case class is used when retrieving the snapshot of the queue status 
from the scheduler at a certain moment.
+ * This is useful to figure out the internal status when any issue happens.
+ * The following would be an example result.
+ *
+ * [
+ * ...
+ *    {
+ *       "data": "RunningData",
+ *       "fqn": "whisk.system/elasticsearch/status-alarm@0.0.2",
+ *       "invocationNamespace": "style95",
+ *       "status": "Running",
+ *       "waitingActivation": 1
+ *    },
+ * ...
+ * ]
+ */
+object StatusQuery
+case class StatusData(invocationNamespace: String, fqn: String, 
waitingActivation: Int, status: String, data: String)
+    extends Message {
+
+  override def serialize: String = StatusData.serdes.write(this).compactPrint
+
+}
+object StatusData extends DefaultJsonProtocol {
+
+  implicit val serdes =
+    jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", 
"waitingActivation", "status", "data")
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
index e80720f..0421e9b 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
@@ -57,6 +57,17 @@ case class ControllerInstanceId(asString: String) extends 
InstanceId {
   override val toJson: JsValue = ControllerInstanceId.serdes.write(this)
 }
 
+case class SchedulerInstanceId(val asString: String) extends InstanceId {
+  validate(asString)
+  override val instanceType = "scheduler"
+
+  override val source = s"$instanceType$asString"
+
+  override val toString: String = source
+
+  override val toJson: JsValue = SchedulerInstanceId.serdes.write(this)
+}
+
 object InvokerInstanceId extends DefaultJsonProtocol {
   def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson))
 
@@ -112,6 +123,10 @@ object ControllerInstanceId extends DefaultJsonProtocol {
   }
 }
 
+object SchedulerInstanceId extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat(SchedulerInstanceId.apply _, "asString")
+}
+
 trait InstanceId {
 
   // controller ids become part of a kafka topic, hence, hence allow only 
certain characters
diff --git a/core/scheduler/Dockerfile b/core/scheduler/Dockerfile
new file mode 100644
index 0000000..244d80d
--- /dev/null
+++ b/core/scheduler/Dockerfile
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM scala
+
+ENV UID=1001 \
+    NOT_ROOT_USER=owuser
+
+# Copy app jars
+ADD build/distributions/scheduler.tar /
+
+COPY init.sh /
+RUN chmod +x init.sh
+
+RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash 
${NOT_ROOT_USER}
+USER ${NOT_ROOT_USER}
+
+EXPOSE 8080
+CMD ["./init.sh", "0"]
diff --git a/core/scheduler/build.gradle b/core/scheduler/build.gradle
new file mode 100644
index 0000000..530c962
--- /dev/null
+++ b/core/scheduler/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'scala'
+apply plugin: 'application'
+apply plugin: 'eclipse'
+apply plugin: 'maven'
+apply plugin: 'org.scoverage'
+
+ext.dockerImageName = 'scheduler'
+apply from: '../../gradle/docker.gradle'
+distDocker.dependsOn ':common:scala:distDocker', 'distTar'
+
+project.archivesBaseName = "openwhisk-scheduler"
+
+ext.coverageDirs = [
+        "${buildDir}/classes/scala/scoverage",
+        
"${project(':common:scala').buildDir.absolutePath}/classes/scala/scoverage"
+]
+distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 
'scoverageClasses'
+
+// Define a separate configuration for managing the dependency on Jetty ALPN 
agent.
+configurations {
+    alpnagent
+}
+
+dependencies {
+    configurations.all {
+        resolutionStrategy.force 
"com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.1.3"
+        resolutionStrategy.force 
"com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}"
+        resolutionStrategy.force 
"com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}"
+        resolutionStrategy.force 
"com.typesafe.akka:akka-http2-support_${gradle.scala.depVersion}:${gradle.akka_http.version}"
+        resolutionStrategy.force 
"com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}"
+        resolutionStrategy.force 
"com.typesafe.akka:akka-parsing_${gradle.scala.depVersion}:${gradle.akka_http.version}"
+        resolutionStrategy.force 
"com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}"
+    }
+
+    compile "org.scala-lang:scala-library:${gradle.scala.version}"
+    compile project(':common:scala')
+
+}
+
+mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
+applicationDefaultJvmArgs = ["-Djava.security.egd=file:/dev/./urandom"]
diff --git a/core/scheduler/init.sh b/core/scheduler/init.sh
new file mode 100644
index 0000000..8d359d5
--- /dev/null
+++ b/core/scheduler/init.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+./copyJMXFiles.sh
+
+export SCHEDULER_OPTS
+SCHEDULER_OPTS="$SCHEDULER_OPTS 
-Dakka.remote.netty.tcp.bind-hostname=$(hostname -i) 
$(./transformEnvironment.sh)"
+
+exec scheduler/bin/scheduler "$@"
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
new file mode 100644
index 0000000..9fc793b
--- /dev/null
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def disable(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  /**
+   * This component is in charge of storing data to ETCD.
+   * Even if any error happens we can assume the data will be eventually 
available in the ETCD by this component.
+   */
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  /**
+   * This component is responsible for creating containers for a given action.
+   * It relies on the creationJobManager to manage the container creation job.
+   */
+  val containerManager = "" // TODO: TBD
+
+  /**
+   * This is a factory to create memory queues.
+   * In the new architecture, each action is given its own dedicated queue.
+   */
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  /**
+   * This is one of the major components which take charge of managing queues 
and coordinating requests among the scheduler, controllers, and invokers.
+   */
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = 
ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = 
None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real 
data class other than just string
+
+  def disable(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = 
loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  /**
+   * The scheduler has two ports, one for akka-remote and the other for 
akka-grpc.
+   */
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the 
scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", 
ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = 
Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new 
AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate,
 "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance 
number as they need to be uniquely identified.
+    require(args.length >= 1, "scheduler instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    val msgProvider = SpiLoader.get[MessagingProvider]
+
+    Seq(
+      ("scheduler" + instanceId.asString, "actions", 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      ("creationAck" + instanceId.asString, "creationAck", 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
+      .foreach {
+        case (topic, topicConfigurationKey, maxMessageBytes) =>
+          if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, 
maxMessageBytes).isFailure) {
+            abort(s"failure during msgProvider.ensureTopic for topic $topic")
+          }
+      }
+
+    ExecManifest.initialize(config) match {
+      case Success(_) =>
+        val schedulerEndpoints = SchedulerEndpoints(host, rpcPort, akkaPort)
+        // Create scheduler
+        val scheduler = new Scheduler(instanceId, schedulerEndpoints)
+
+        // TODO: Add Akka-grpc handler
+        val httpsConfig =
+          if (Scheduler.protocol == "https") 
Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
+
+        
BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, 
port, httpsConfig)(
+          actorSystem,
+          ActorMaterializer.create(actorSystem))
+
+      case Failure(t) =>
+        abort(s"Invalid runtimes manifest: $t")
+    }
+  }
+}
+case class SchedulerEndpoints(host: String, rpcPort: Int, akkaPort: Int) {
+  require(rpcPort != 0 || akkaPort != 0)
+  def asRpcEndpoint: String = s"$host:$rpcPort"
+  def asAkkaEndpoint: String = s"$host:$akkaPort"
+
+  def getRemoteRef(name: String)(implicit context: ActorRefFactory): 
ActorSelection = {
+    implicit val ec = context.dispatcher
+
+    val path = 
s"akka.tcp://scheduler-actor-system@${asAkkaEndpoint}/user/${name}"
+    context.actorSelection(path)
+  }
+
+  def serialize = SchedulerEndpoints.serdes.write(this).compactPrint
+}
+
+object SchedulerEndpoints extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat(SchedulerEndpoints.apply, "host", 
"rpcPort", "akkaPort")
+  def parse(endpoints: String) = Try(serdes.read(endpoints.parseJson))
+}
+
+case class SchedulerStates(sid: SchedulerInstanceId, queueSize: Int, 
endpoints: SchedulerEndpoints) {
+  private implicit val askTimeout = Timeout(5 seconds)
+
+  def getRemoteRef(name: String)(implicit context: ActorRefFactory): 
ActorSelection = {
+    implicit val ec = context.dispatcher
+
+    val path = 
s"akka.tcp://scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}"
+    context.actorSelection(path)
+  }
+
+  def getSchedulerId(): SchedulerInstanceId = sid
+
+  def serialize = SchedulerStates.serdes.write(this).compactPrint
+}
+
+object SchedulerStates extends DefaultJsonProtocol {
+  private implicit val endpointsSerde = SchedulerEndpoints.serdes
+  implicit val serdes = jsonFormat(SchedulerStates.apply, "sid", "queueSize", 
"endpoints")
+
+  def parse(states: String) = Try(serdes.read(states.parseJson))
+}
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
new file mode 100644
index 0000000..841b139
--- /dev/null
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json._
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ * Currently provides a health ping route, only.
+ */
+class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, 
systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  implicit val actorSystem: ActorSystem,
+  implicit val logger: Logging)
+    extends BasicRasService {
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == 
systemUsername && password == systemPassword =>
+        (path("disable") & post) {
+          logger.warn(this, "Scheduler is disabled")
+          scheduler.disable()
+          complete("scheduler disabled")
+        }
+      case _ =>
+        implicit val jsonPrettyResponsePrinter = PrettyPrinter
+        terminate(StatusCodes.Unauthorized)
+    }
+  }
+}
+
+object SchedulerServer {
+
+  val schedulerUsername = {
+    val source = scala.io.Source.fromFile("/conf/schedulerauth.username")
+    try source.mkString.replaceAll("\r|\n", "")
+    finally source.close()
+  }
+  val schedulerPassword = {
+    val source = scala.io.Source.fromFile("/conf/schedulerauth.password")
+    try source.mkString.replaceAll("\r|\n", "")
+    finally source.close()
+  }
+
+  def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
+                                         actorSystem: ActorSystem,
+                                         logger: Logging): BasicRasService =
+    new SchedulerServer(scheduler, schedulerUsername, schedulerPassword)
+}
diff --git a/settings.gradle b/settings.gradle
index df0ee97..4792aa9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,6 +18,7 @@
 include 'common:scala'
 
 include 'core:controller'
+include 'core:scheduler'
 include 'core:invoker'
 include 'core:cosmosdb:cache-invalidator'
 include 'core:standalone'

Reply via email to