dgrove-oss commented on a change in pull request #3338: implement 
suspend/resume for KubernetesContainer
URL: 
https://github.com/apache/incubator-openwhisk/pull/3338#discussion_r171450061
 
 

 ##########
 File path: 
core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
 ##########
 @@ -99,43 +117,144 @@ class KubernetesClient(
   }
   protected val kubectlCmd = Seq(findKubectlCmd)
 
-  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
-    implicit transid: TransactionId): Future[ContainerId] = {
-    runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
-      .map(_ => ContainerId(name))
-  }
+  def run(name: String,
+          image: String,
+          memory: ByteSize = 256.MB,
+          environment: Map[String, String] = Map.empty,
+          labels: Map[String, String] = Map.empty)(implicit transid: 
TransactionId): Future[KubernetesContainer] = {
+
+    val envVars = environment.map {
+      case (key, value) => new 
EnvVarBuilder().withName(key).withValue(value).build()
+    }.toSeq
+
+    val pod = new PodBuilder()
+      .withNewMetadata()
+      .withName(name)
+      .addToLabels("name", name)
+      .addToLabels(labels.asJava)
+      .endMetadata()
+      .withNewSpec()
+      .withRestartPolicy("Always")
+      .addNewContainer()
+      .withNewResources()
+      .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+      .endResources()
+      .withName("user-action")
+      .withImage(image)
+      .withEnv(envVars.asJava)
+      .addNewPort()
+      .withContainerPort(8080)
+      .withName("action")
+      .endPort()
+      .endContainer()
+      .endSpec()
+      .build()
+
+    kubeRestClient.pods.inNamespace(config.namespace).create(pod)
 
-  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): 
Future[ContainerAddress] = {
     Future {
       blocking {
-        val pod =
-          
kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length,
 timeouts.inspect.unit)
-        ContainerAddress(pod.getStatus().getPodIP())
+        val createdPod = kubeRestClient.pods
+          .inNamespace(config.namespace)
+          .withName(name)
+          .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
+        toContainer(createdPod)
       }
     }.recoverWith {
       case e =>
-        log.error(this, s"Failed to get IP of Pod '${id.asString}' within 
timeout: ${e.getClass} - ${e.getMessage}")
-        Future.failed(new Exception(s"Failed to get IP of Pod 
'${id.asString}'"))
+        log.error(this, s"Failed create pod for '$name': ${e.getClass} - 
${e.getMessage}")
+        Future.failed(new Exception(s"Failed to create pod '$name'"))
+    }
+  }
+
+  def rm(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = {
+    runCmd(Seq("delete", "--now", "pod", container.id.asString), 
config.timeouts.rm).map(_ => ())
+  }
+
+  def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit 
transid: TransactionId): Future[Unit] = {
+    if (ensureUnpaused && config.invokerAgent.enabled) {
+      // The caller can't guarantee that every container with the label 
key=value is already unpaused.
+      // Therefore we must enumerate them and ensure they are unpaused before 
we attempt to delete them.
+      Future {
+        blocking {
+          kubeRestClient
+            .inNamespace(config.namespace)
+            .pods()
+            .withLabel(key, value)
+            .list()
+            .getItems
+            .asScala
+            .map { pod =>
+              val container = toContainer(pod)
+              container
+                .resume()
+                .recover { case _ => () } // Ignore errors; it is possible the 
container was not actually suspended.
+                .map(_ => rm(container))
+            }
+        }
+      }.flatMap(futures =>
+        Future
+          .sequence(futures)
+          .map(_ => ()))
+    } else {
+      runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
config.timeouts.rm).map(_ => ())
     }
   }
 
-  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => 
())
+  def suspend(container: KubernetesContainer)(implicit transid: 
TransactionId): Future[Unit] = {
+    if (config.invokerAgent.enabled) {
+      agentCommand("suspend", container)
+        .map { response =>
+          response.discardEntityBytes()
+        }
+    } else {
+      Future.successful({})
+    }
+  }
 
-  def rm(key: String, value: String)(implicit transid: TransactionId): 
Future[Unit] =
-    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
timeouts.rm).map(_ => ())
+  def resume(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = {
+    if (config.invokerAgent.enabled) {
+      agentCommand("resume", container)
+        .map { response =>
+          response.discardEntityBytes()
+        }
+    } else {
+      Future.successful({})
+    }
+  }
 
-  def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: 
Boolean = false)(
+  def logs(container: KubernetesContainer, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any] = {
 
     log.debug(this, "Parsing logs from Kubernetes Graph Stage?")
 
     Source
-      .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, 
waitForSentinel))
+      .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime, 
waitForSentinel))
       .log("foobar")
 
   }
 
+  private def toContainer(pod: Pod): KubernetesContainer = {
+    val id = ContainerId(pod.getMetadata.getName)
+    val addr = ContainerAddress(pod.getStatus.getPodIP)
+    val workerIP = pod.getStatus.getHostIP
+    // Extract the native (docker or containerd) containerId for the container
+    // By convention, kubernetes adds a docker:// prefix when using docker as 
the low-level container engine
+    val nativeContainerId = 
pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
 
 Review comment:
   not a bad idea, but let's defer that to a later PR.  I'd like to get this 
one merged quickly, so I can submit the logging implementation I have that 
builds on it. Then can rethink how we pass the nativeContainerId to the 
invokerAgent.   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to