dgrove-oss commented on a change in pull request #3338: implement suspend/resume for KubernetesContainer URL: https://github.com/apache/incubator-openwhisk/pull/3338#discussion_r171450061
########## File path: core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala ########## @@ -99,43 +117,144 @@ class KubernetesClient( } protected val kubectlCmd = Seq(findKubectlCmd) - def run(name: String, image: String, args: Seq[String] = Seq.empty[String])( - implicit transid: TransactionId): Future[ContainerId] = { - runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run) - .map(_ => ContainerId(name)) - } + def run(name: String, + image: String, + memory: ByteSize = 256.MB, + environment: Map[String, String] = Map.empty, + labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = { + + val envVars = environment.map { + case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build() + }.toSeq + + val pod = new PodBuilder() + .withNewMetadata() + .withName(name) + .addToLabels("name", name) + .addToLabels(labels.asJava) + .endMetadata() + .withNewSpec() + .withRestartPolicy("Always") + .addNewContainer() + .withNewResources() + .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava) + .endResources() + .withName("user-action") + .withImage(image) + .withEnv(envVars.asJava) + .addNewPort() + .withContainerPort(8080) + .withName("action") + .endPort() + .endContainer() + .endSpec() + .build() + + kubeRestClient.pods.inNamespace(config.namespace).create(pod) - def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] = { Future { blocking { - val pod = - kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length, timeouts.inspect.unit) - ContainerAddress(pod.getStatus().getPodIP()) + val createdPod = kubeRestClient.pods + .inNamespace(config.namespace) + .withName(name) + .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit) + toContainer(createdPod) } }.recoverWith { case e => - log.error(this, s"Failed to get IP of Pod '${id.asString}' within timeout: ${e.getClass} - ${e.getMessage}") - Future.failed(new Exception(s"Failed to get IP of Pod '${id.asString}'")) + log.error(this, s"Failed create pod for '$name': ${e.getClass} - ${e.getMessage}") + Future.failed(new Exception(s"Failed to create pod '$name'")) + } + } + + def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { + runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ()) + } + + def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = { + if (ensureUnpaused && config.invokerAgent.enabled) { + // The caller can't guarantee that every container with the label key=value is already unpaused. + // Therefore we must enumerate them and ensure they are unpaused before we attempt to delete them. + Future { + blocking { + kubeRestClient + .inNamespace(config.namespace) + .pods() + .withLabel(key, value) + .list() + .getItems + .asScala + .map { pod => + val container = toContainer(pod) + container + .resume() + .recover { case _ => () } // Ignore errors; it is possible the container was not actually suspended. + .map(_ => rm(container)) + } + } + }.flatMap(futures => + Future + .sequence(futures) + .map(_ => ())) + } else { + runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ()) } } - def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => ()) + def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { + if (config.invokerAgent.enabled) { + agentCommand("suspend", container) + .map { response => + response.discardEntityBytes() + } + } else { + Future.successful({}) + } + } - def rm(key: String, value: String)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), timeouts.rm).map(_ => ()) + def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { + if (config.invokerAgent.enabled) { + agentCommand("resume", container) + .map { response => + response.discardEntityBytes() + } + } else { + Future.successful({}) + } + } - def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean = false)( + def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)( implicit transid: TransactionId): Source[TypedLogLine, Any] = { log.debug(this, "Parsing logs from Kubernetes Graph Stage?") Source - .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, waitForSentinel)) + .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime, waitForSentinel)) .log("foobar") } + private def toContainer(pod: Pod): KubernetesContainer = { + val id = ContainerId(pod.getMetadata.getName) + val addr = ContainerAddress(pod.getStatus.getPodIP) + val workerIP = pod.getStatus.getHostIP + // Extract the native (docker or containerd) containerId for the container + // By convention, kubernetes adds a docker:// prefix when using docker as the low-level container engine + val nativeContainerId = pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://") Review comment: not a bad idea, but let's defer that to a later PR. I'd like to get this one merged quickly, so I can submit the logging implementation I have that builds on it. Then can rethink how we pass the nativeContainerId to the invokerAgent. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services