dgrove-oss commented on a change in pull request #3338: implement 
suspend/resume for KubernetesContainer

 File path: 
 @@ -99,43 +117,144 @@ class KubernetesClient(
   protected val kubectlCmd = Seq(findKubectlCmd)
-  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
-    implicit transid: TransactionId): Future[ContainerId] = {
-    runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
-      .map(_ => ContainerId(name))
-  }
+  def run(name: String,
+          image: String,
+          memory: ByteSize = 256.MB,
+          environment: Map[String, String] = Map.empty,
+          labels: Map[String, String] = Map.empty)(implicit transid: 
TransactionId): Future[KubernetesContainer] = {
+    val envVars = environment.map {
+      case (key, value) => new 
+    }.toSeq
+    val pod = new PodBuilder()
+      .withNewMetadata()
+      .withName(name)
+      .addToLabels("name", name)
+      .addToLabels(labels.asJava)
+      .endMetadata()
+      .withNewSpec()
+      .withRestartPolicy("Always")
+      .addNewContainer()
+      .withNewResources()
+      .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+      .endResources()
+      .withName("user-action")
+      .withImage(image)
+      .withEnv(envVars.asJava)
+      .addNewPort()
+      .withContainerPort(8080)
+      .withName("action")
+      .endPort()
+      .endContainer()
+      .endSpec()
+      .build()
+    kubeRestClient.pods.inNamespace(config.namespace).create(pod)
-  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): 
Future[ContainerAddress] = {
     Future {
       blocking {
-        val pod =
-        ContainerAddress(pod.getStatus().getPodIP())
+        val createdPod = kubeRestClient.pods
+          .inNamespace(config.namespace)
+          .withName(name)
+          .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
+        toContainer(createdPod)
     }.recoverWith {
       case e =>
-        log.error(this, s"Failed to get IP of Pod '${id.asString}' within 
timeout: ${e.getClass} - ${e.getMessage}")
-        Future.failed(new Exception(s"Failed to get IP of Pod 
+        log.error(this, s"Failed create pod for '$name': ${e.getClass} - 
+        Future.failed(new Exception(s"Failed to create pod '$name'"))
+    }
+  }
+  def rm(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = {
+    runCmd(Seq("delete", "--now", "pod", container.id.asString), 
config.timeouts.rm).map(_ => ())
+  }
+  def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit 
transid: TransactionId): Future[Unit] = {
+    if (ensureUnpaused && config.invokerAgent.enabled) {
+      // The caller can't guarantee that every container with the label 
key=value is already unpaused.
+      // Therefore we must enumerate them and ensure they are unpaused before 
we attempt to delete them.
+      Future {
+        blocking {
+          kubeRestClient
+            .inNamespace(config.namespace)
+            .pods()
+            .withLabel(key, value)
+            .list()
+            .getItems
+            .asScala
+            .map { pod =>
+              val container = toContainer(pod)
+              container
+                .resume()
+                .recover { case _ => () } // Ignore errors; it is possible the 
container was not actually suspended.
+                .map(_ => rm(container))
+            }
+        }
+      }.flatMap(futures =>
+        Future
+          .sequence(futures)
+          .map(_ => ()))
+    } else {
+      runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
config.timeouts.rm).map(_ => ())
-  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => 
+  def suspend(container: KubernetesContainer)(implicit transid: 
TransactionId): Future[Unit] = {
+    if (config.invokerAgent.enabled) {
+      agentCommand("suspend", container)
+        .map { response =>
+          response.discardEntityBytes()
+        }
+    } else {
+      Future.successful({})
+    }
+  }
-  def rm(key: String, value: String)(implicit transid: TransactionId): 
Future[Unit] =
-    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
timeouts.rm).map(_ => ())
+  def resume(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = {
+    if (config.invokerAgent.enabled) {
+      agentCommand("resume", container)
+        .map { response =>
+          response.discardEntityBytes()
+        }
+    } else {
+      Future.successful({})
+    }
+  }
-  def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: 
Boolean = false)(
+  def logs(container: KubernetesContainer, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any] = {
     log.debug(this, "Parsing logs from Kubernetes Graph Stage?")
-      .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, 
+      .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime, 
+  private def toContainer(pod: Pod): KubernetesContainer = {
+    val id = ContainerId(pod.getMetadata.getName)
+    val addr = ContainerAddress(pod.getStatus.getPodIP)
+    val workerIP = pod.getStatus.getHostIP
+    // Extract the native (docker or containerd) containerId for the container
+    // By convention, kubernetes adds a docker:// prefix when using docker as 
the low-level container engine
+    val nativeContainerId = 
 Review comment:
   not a bad idea, but let's defer that to a later PR.  I'd like to get this 
one merged quickly, so I can submit the logging implementation I have that 
builds on it. Then can rethink how we pass the nativeContainerId to the 

