This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c53cacb  Refactor KubernetesClient to separate invokerAgent; Add 
forwarding LogStoreProvider. (#3411)
c53cacb is described below

commit c53cacba0e8d4f3c655e23ce2c558716b7f0a472
Author: David Grove <dgrove-...@users.noreply.github.com>
AuthorDate: Mon Mar 12 04:13:14 2018 -0400

    Refactor KubernetesClient to separate invokerAgent; Add forwarding 
LogStoreProvider. (#3411)
    
    This commit pushes the invokerAgent functionality into a new
    subclass of KubernetesClient, KubernetesClientWithInvokerAgent,
    to achieve a cleaner separation of concerns.
    
    It also adds a new LogDriverLogStore implementation for Kubernetes
    that uses the invokerAgent to perform remote processing and forwarding
    of action logs.  This enables more efficient log processing than
    relying on the Kubernetes API to retrieve the logs and stream
    them through the invoker.
    
    It also adds a unit test for the log forwarding support in
    KubernetesContainer and some minor cleanup of the test cases
    to uniformly use the await helper function.
---
 .../kubernetes/KubernetesClient.scala              |  81 ++---------
 .../KubernetesClientWithInvokerAgent.scala         | 158 +++++++++++++++++++++
 .../kubernetes/KubernetesContainer.scala           |  32 ++++-
 .../kubernetes/KubernetesContainerFactory.scala    |  14 +-
 .../KubernetesInvokerAgentLogStore.scala           |  76 ++++++++++
 .../kubernetes/test/KubernetesClientTests.scala    |  54 ++++---
 .../kubernetes/test/KubernetesContainerTests.scala |  26 +++-
 7 files changed, 353 insertions(+), 88 deletions(-)

diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 6853f66..4cfc813 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -26,11 +26,10 @@ import java.time.format.DateTimeFormatterBuilder
 
 import akka.actor.ActorSystem
 import akka.event.Logging.{ErrorLevel, InfoLevel}
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
+import akka.http.scaladsl.model.Uri
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.Uri.Query
 import akka.stream.{Attributes, Outlet, SourceShape}
-import akka.http.scaladsl.Http
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Source
 import akka.stream.stage._
@@ -82,10 +81,10 @@ case class KubernetesInvokerAgentConfig(enabled: Boolean, 
port: Int)
 case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, 
invokerAgent: KubernetesInvokerAgentConfig)
 
 /**
- * Serves as interface to the kubectl CLI tool.
+ * Serves as an interface to the Kubernetes API by proxying its REST API 
and/or invoking the kubectl CLI.
  *
- * Be cautious with the ExecutionContext passed to this, as the
- * calls to the CLI are blocking.
+ * Be cautious with the ExecutionContext passed to this, as many
+ * operations are blocking.
  *
  * You only need one instance (and you shouldn't get more).
  */
@@ -94,9 +93,9 @@ class KubernetesClient(
   executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
     extends KubernetesApi
     with ProcessRunner {
-  implicit private val ec = executionContext
-  implicit private val am = ActorMaterializer()
-  implicit private val kubeRestClient = new DefaultKubernetesClient(
+  implicit protected val ec = executionContext
+  implicit protected val am = ActorMaterializer()
+  implicit protected val kubeRestClient = new DefaultKubernetesClient(
     new ConfigBuilder()
       .withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
       .withRequestTimeout(config.timeouts.logs.toMillis.toInt)
@@ -171,56 +170,14 @@ class KubernetesClient(
   }
 
   def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit 
transid: TransactionId): Future[Unit] = {
-    if (ensureUnpaused && config.invokerAgent.enabled) {
-      // The caller can't guarantee that every container with the label 
key=value is already unpaused.
-      // Therefore we must enumerate them and ensure they are unpaused before 
we attempt to delete them.
-      Future {
-        blocking {
-          kubeRestClient
-            .inNamespace(kubeRestClient.getNamespace)
-            .pods()
-            .withLabel(key, value)
-            .list()
-            .getItems
-            .asScala
-            .map { pod =>
-              val container = toContainer(pod)
-              container
-                .resume()
-                .recover { case _ => () } // Ignore errors; it is possible the 
container was not actually suspended.
-                .map(_ => rm(container))
-            }
-        }
-      }.flatMap(futures =>
-        Future
-          .sequence(futures)
-          .map(_ => ()))
-    } else {
-      runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
config.timeouts.rm).map(_ => ())
-    }
+    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
config.timeouts.rm).map(_ => ())
   }
 
-  def suspend(container: KubernetesContainer)(implicit transid: 
TransactionId): Future[Unit] = {
-    if (config.invokerAgent.enabled) {
-      agentCommand("suspend", container)
-        .map { response =>
-          response.discardEntityBytes()
-        }
-    } else {
-      Future.successful({})
-    }
-  }
+  // suspend is a no-op with the basic KubernetesClient
+  def suspend(container: KubernetesContainer)(implicit transid: 
TransactionId): Future[Unit] = Future.successful({})
 
-  def resume(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = {
-    if (config.invokerAgent.enabled) {
-      agentCommand("resume", container)
-        .map { response =>
-          response.discardEntityBytes()
-        }
-    } else {
-      Future.successful({})
-    }
-  }
+  // resume is a no-op with the basic KubernetesClient
+  def resume(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = Future.successful({})
 
   def logs(container: KubernetesContainer, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any] = {
@@ -233,7 +190,7 @@ class KubernetesClient(
 
   }
 
-  private def toContainer(pod: Pod): KubernetesContainer = {
+  protected def toContainer(pod: Pod): KubernetesContainer = {
     val id = ContainerId(pod.getMetadata.getName)
     val addr = ContainerAddress(pod.getStatus.getPodIP)
     val workerIP = pod.getStatus.getHostIP
@@ -244,17 +201,7 @@ class KubernetesClient(
     new KubernetesContainer(id, addr, workerIP, nativeContainerId)
   }
 
-  // Forward a command to invoker-agent daemonset instance on container's 
worker node
-  private def agentCommand(command: String, container: KubernetesContainer): 
Future[HttpResponse] = {
-    val uri = Uri()
-      .withScheme("http")
-      .withHost(container.workerIP)
-      .withPort(config.invokerAgent.port)
-      .withPath(Path / command / container.nativeContainerId)
-    Http().singleRequest(HttpRequest(uri = uri))
-  }
-
-  private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: 
TransactionId): Future[String] = {
+  protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: 
TransactionId): Future[String] = {
     val cmd = kubectlCmd ++ args
     val start = transid.started(
       this,
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
new file mode 100644
index 0000000..417a287
--- /dev/null
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import whisk.common.{Logging, TransactionId}
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MessageEntity, Uri}
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import pureconfig.loadConfigOrThrow
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.ConfigKeys
+import whisk.core.entity.ByteSize
+
+import collection.JavaConverters._
+import scala.concurrent.{blocking, ExecutionContext, Future}
+
+/**
+ * An extended kubernetes client that works in tandem with an invokerAgent 
DaemonSet with
+ * instances running on every worker node that runs user containers to provide
+ * suspend/resume capability and higher performance log processing.
+ */
+class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
+                                         
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
+  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+    extends KubernetesClient(config)(executionContext)
+    with KubernetesApiWithInvokerAgent {
+
+  override def rm(key: String, value: String, ensureUnpaused: Boolean = false)(
+    implicit transid: TransactionId): Future[Unit] = {
+    if (ensureUnpaused) {
+      // The caller can't guarantee that every container with the label 
key=value is already unpaused.
+      // Therefore we must enumerate them and ensure they are unpaused before 
we attempt to delete them.
+      Future {
+        blocking {
+          kubeRestClient
+            .inNamespace(kubeRestClient.getNamespace)
+            .pods()
+            .withLabel(key, value)
+            .list()
+            .getItems
+            .asScala
+            .map { pod =>
+              val container = toContainer(pod)
+              container
+                .resume()
+                .recover { case _ => () } // Ignore errors; it is possible the 
container was not actually suspended.
+                .map(_ => rm(container))
+            }
+        }
+      }.flatMap(futures =>
+        Future
+          .sequence(futures)
+          .map(_ => ()))
+    } else {
+      super.rm(key, value, ensureUnpaused)
+    }
+  }
+
+  override def suspend(container: KubernetesContainer)(implicit transid: 
TransactionId): Future[Unit] = {
+    agentCommand("suspend", container)
+      .map(_.discardEntityBytes())
+  }
+
+  override def resume(container: KubernetesContainer)(implicit transid: 
TransactionId): Future[Unit] = {
+    agentCommand("resume", container)
+      .map(_.discardEntityBytes())
+  }
+
+  override def forwardLogs(container: KubernetesContainer,
+                           lastOffset: Long,
+                           sizeLimit: ByteSize,
+                           sentinelledLogs: Boolean,
+                           additionalMetadata: Map[String, JsValue],
+                           augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Long] = {
+    val serializedData = Map(
+      "lastOffset" -> JsNumber(lastOffset),
+      "sizeLimit" -> JsNumber(sizeLimit.toBytes),
+      "sentinelledLogs" -> JsBoolean(sentinelledLogs),
+      "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)),
+      "encodedActivation" -> JsString(augmentedActivation.compactPrint))
+
+    agentCommand("logs", container, Some(serializedData))
+      .flatMap(response => Unmarshal(response.entity).to[String].map(_.toLong))
+  }
+
+  override def agentCommand(command: String,
+                            container: KubernetesContainer,
+                            payload: Option[Map[String, JsValue]] = None): 
Future[HttpResponse] = {
+    val uri = Uri()
+      .withScheme("http")
+      .withHost(container.workerIP)
+      .withPort(config.invokerAgent.port)
+      .withPath(Path / command / container.nativeContainerId)
+
+    Marshal(payload).to[MessageEntity].flatMap { entity =>
+      Http().singleRequest(HttpRequest(uri = uri, entity = entity))
+    }
+  }
+
+  private def fieldsString(fields: Map[String, JsValue]) =
+    fields
+      .map {
+        case (key, value) => s""""$key":${value.compactPrint}"""
+      }
+      .mkString(",")
+}
+
+trait KubernetesApiWithInvokerAgent extends KubernetesApi {
+
+  /**
+   * Request the invokerAgent running on the container's worker node to 
execute the given command
+   * @param command The command verb to execute
+   * @param container The container to which the command should be applied
+   * @param payload The additional data needed to execute the command.
+   * @return The HTTPResponse from the remote agent.
+   */
+  def agentCommand(command: String,
+                   container: KubernetesContainer,
+                   payload: Option[Map[String, JsValue]] = None): 
Future[HttpResponse]
+
+  /**
+   * Forward a section the argument container's stdout/stderr output to an 
external logging service.
+   *
+   * @param container the container whose logs should be forwarded
+   * @param lastOffset the last offset previously read in the remote log file
+   * @param sizeLimit The maximum number of bytes of log that should be 
forwarded before truncation
+   * @param sentinelledLogs Should the log forwarder expect a sentinel line at 
the end of stdout/stderr streams?
+   * @param additionalMetadata Additional metadata that should be injected 
into every log line
+   * @param augmentedActivation Activation record to be appended to the 
forwarded log.
+   * @return the last offset read from the remote log file (to be used on next 
call to forwardLogs)
+   */
+  def forwardLogs(container: KubernetesContainer,
+                  lastOffset: Long,
+                  sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Long]
+}
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index f1d03d1..11ebbbf 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -18,12 +18,13 @@
 package whisk.core.containerpool.kubernetes
 
 import java.time.Instant
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 
 import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
+import spray.json._
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
@@ -97,6 +98,9 @@ class KubernetesContainer(protected[core] val id: ContainerId,
   /** The last read timestamp in the log file */
   private val lastTimestamp = new AtomicReference[Option[Instant]](None)
 
+  /** The last offset read in the remote log file */
+  private val lastOffset = new AtomicLong(0)
+
   protected val waitForLogs: FiniteDuration = 2.seconds
 
   def suspend()(implicit transid: TransactionId): Future[Unit] = 
kubernetes.suspend(this)
@@ -110,6 +114,31 @@ class KubernetesContainer(protected[core] val id: 
ContainerId,
 
   private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
 
+  /**
+   * Request that the activation's log output be forwarded to an external log 
service (implicit in LogProvider choice).
+   * Additional per log line metadata and the activation record is provided to 
be optionally included
+   * in the forwarded log entry.
+   *
+   * @param sizeLimit The maximum number of bytes of log that should be 
forwardewd
+   * @param sentinelledLogs Should the log forwarder expect a sentinel line at 
the end of stdout/stderr streams?
+   * @param additionalMetadata Additional metadata that should be injected 
into every log line
+   * @param augmentedActivation Activation record to be appended to the 
forwarded log.
+   */
+  def forwardLogs(sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Unit] = {
+    kubernetes match {
+      case client: KubernetesApiWithInvokerAgent => {
+        client
+          .forwardLogs(this, lastOffset.get, sizeLimit, sentinelledLogs, 
additionalMetadata, augmentedActivation)
+          .map(newOffset => lastOffset.set(newOffset))
+      }
+      case _ =>
+        Future.failed(new UnsupportedOperationException("forwardLogs requires 
whisk.kubernetes.invokerAgent.enabled"))
+    }
+  }
+
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: 
TransactionId): Source[ByteString, Any] = {
 
     kubernetes
@@ -136,5 +165,4 @@ class KubernetesContainer(protected[core] val id: 
ContainerId,
         line.toByteString
       }
   }
-
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index b06ec5c..8b2d918 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.kubernetes
 
 import akka.actor.ActorSystem
+import pureconfig.loadConfigOrThrow
 
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
@@ -32,14 +33,23 @@ import whisk.core.containerpool.ContainerFactoryProvider
 import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.InstanceId
-import whisk.core.WhiskConfig
+import whisk.core.{ConfigKeys, WhiskConfig}
 
 class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit 
actorSystem: ActorSystem,
                                                                      ec: 
ExecutionContext,
                                                                      logging: 
Logging)
     extends ContainerFactory {
 
-  implicit val kubernetes = new KubernetesClient()(ec)
+  implicit val kubernetes = initializeKubeClient()
+
+  private def initializeKubeClient(): KubernetesClient = {
+    val config = 
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes)
+    if (config.invokerAgent.enabled) {
+      new KubernetesClientWithInvokerAgent(config)(ec)
+    } else {
+      new KubernetesClient(config)(ec)
+    }
+  }
 
   /** Perform cleanup on init */
   override def init(): Unit = cleanup()
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
new file mode 100644
index 0000000..17f8363
--- /dev/null
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.logging.{LogCollectingException, 
LogDriverLogStore, LogStore, LogStoreProvider}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+ * A LogStore implementation for Kubernetes that delegates all log processing 
to a remote invokerAgent that
+ * runs on the worker node where the user container is executing.  The remote 
invokerAgent will read container logs,
+ * enrich them with the activation-specific metadata it is provided, and 
consolidate them into a remote
+ * combined log file that can be processed asynchronously by log forwarding 
services.
+ *
+ * Logs are never processed by the invoker itself and therefore are not stored 
in the activation record;
+ * collectLogs will return an empty ActivationLogs.
+ */
+class KubernetesInvokerAgentLogStore(system: ActorSystem) extends 
LogDriverLogStore(system) {
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val mat: ActorMaterializer = ActorMaterializer()(system)
+
+  override def collectLogs(transid: TransactionId,
+                           user: Identity,
+                           activation: WhiskActivation,
+                           container: Container,
+                           action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
+
+    val sizeLimit = action.limits.logs.asMegaBytes
+    val sentinelledLogs = action.exec.sentinelledLogs
+
+    // Add the userId field to every written record, so any background process 
can properly correlate.
+    val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson)
+
+    val additionalMetadata = Map(
+      "activationId" -> activation.activationId.asString.toJson,
+      "action" -> action.fullyQualifiedName(false).asString.toJson) ++ 
userIdField
+
+    val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)
+
+    container match {
+      case kc: KubernetesContainer => {
+        kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, 
augmentedActivation)(transid)
+          .map { _ =>
+            ActivationLogs()
+          }
+      }
+      case _ => Future.failed(LogCollectingException(ActivationLogs()))
+    }
+  }
+}
+
+object KubernetesInvokerAgentLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new 
KubernetesInvokerAgentLogStore(actorSystem)
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 0eb4f95..5190a2f 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -20,6 +20,7 @@ package whisk.core.containerpool.kubernetes.test
 import java.time.Instant
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpResponse
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Concat, Sink, Source}
 
@@ -37,15 +38,10 @@ import org.scalatest.Matchers
 import org.scalatest.time.{Seconds, Span}
 import common.{StreamLogging, WskActorSystem}
 import okio.Buffer
+import spray.json.{JsObject, JsValue}
 import whisk.common.TransactionId
 import whisk.core.containerpool.{ContainerAddress, ContainerId}
-import whisk.core.containerpool.kubernetes.{
-  KubernetesApi,
-  KubernetesClient,
-  KubernetesContainer,
-  KubernetesRestLogSourceStage,
-  TypedLogLine
-}
+import whisk.core.containerpool.kubernetes._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
 
@@ -65,6 +61,9 @@ class KubernetesClientTests
 
   implicit val materializer: ActorMaterializer = ActorMaterializer()
 
+  val commandTimeout = 500.milliseconds
+  def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = 
Await.result(f, timeout)
+
   /** Reads logs into memory and awaits them */
   def awaitLogs(source: Source[TypedLogLine, Any], timeout: FiniteDuration = 
1000.milliseconds): Vector[TypedLogLine] =
     Await.result(source.runWith(Sink.seq[TypedLogLine]), timeout).toVector
@@ -77,16 +76,17 @@ class KubernetesClientTests
   val id = 
ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
   val container = kubernetesContainer(id)
 
-  val commandTimeout = 500.milliseconds
-  def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = 
Await.result(f, timeout)
-
   val kubectlCommand = "kubectl"
 
   /** Returns a KubernetesClient with a mocked result for 'executeProcess' */
-  def kubernetesClient(fixture: => Future[String]) = new 
KubernetesClient()(global) {
-    override def findKubectlCmd() = kubectlCommand
-    override def executeProcess(args: Seq[String], timeout: Duration)(implicit 
ec: ExecutionContext, as: ActorSystem) =
-      fixture
+  def kubernetesClient(fixture: => Future[String]) = {
+    new KubernetesClient()(global) {
+      override def findKubectlCmd() = kubectlCommand
+
+      override def executeProcess(args: Seq[String], timeout: 
Duration)(implicit ec: ExecutionContext,
+                                                                        as: 
ActorSystem) =
+        fixture
+    }
   }
 
   def kubernetesContainer(id: ContainerId) =
@@ -121,7 +121,7 @@ class KubernetesClientTests
     implicit val kubernetes = new TestKubernetesClient
     val id = ContainerId("id")
     val container = new KubernetesContainer(id, ContainerAddress("ip"), 
"127.0.0.1", "docker://foo")
-    container.suspend()
+    await(container.suspend())
     kubernetes.suspends should have size 1
     kubernetes.suspends(0) shouldBe id
   }
@@ -130,7 +130,7 @@ class KubernetesClientTests
     implicit val kubernetes = new TestKubernetesClient
     val id = ContainerId("id")
     val container = new KubernetesContainer(id, ContainerAddress("ip"), 
"127.0.0.1", "docker://foo")
-    container.resume()
+    await(container.resume())
     kubernetes.resumes should have size 1
     kubernetes.resumes(0) shouldBe id
   }
@@ -237,4 +237,26 @@ object KubernetesClientTests {
       Source(List.empty[TypedLogLine])
     }
   }
+
+  class TestKubernetesClientWithInvokerAgent extends TestKubernetesClient with 
KubernetesApiWithInvokerAgent {
+    var agentCommands = mutable.Buffer.empty[(ContainerId, String, 
Option[Map[String, JsValue]])]
+    var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)]
+
+    def agentCommand(command: String,
+                     container: KubernetesContainer,
+                     payload: Option[Map[String, JsValue]] = None): 
Future[HttpResponse] = {
+      agentCommands += ((container.id, command, payload))
+      Future.successful(HttpResponse())
+    }
+
+    def forwardLogs(container: KubernetesContainer,
+                    lastOffset: Long,
+                    sizeLimit: ByteSize,
+                    sentinelledLogs: Boolean,
+                    additionalMetadata: Map[String, JsValue],
+                    augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Long] = {
+      forwardLogs += ((container.id, lastOffset))
+      Future.successful(lastOffset + sizeLimit.toBytes) // for testing, 
pretend we read size limit bytes
+    }
+  }
 }
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 3415b68..fc3ba68 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -50,6 +50,7 @@ import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.core.containerpool.docker.test.DockerContainerTests._
+import 
whisk.core.containerpool.kubernetes.test.KubernetesClientTests.TestKubernetesClientWithInvokerAgent
 
 import scala.collection.{immutable, mutable}
 
@@ -84,8 +85,11 @@ class KubernetesContainerTests
   val toFormattedString: Flow[ByteString, String, NotUsed] =
     
Flow[ByteString].map(_.utf8String.parseJson.convertTo[TypedLogLine].toString)
 
+  val commandTimeout = 500.milliseconds
+  def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = 
Await.result(f, timeout)
+
   /** Reads logs into memory and awaits them */
-  def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 
500.milliseconds): Vector[String] =
+  def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 
commandTimeout): Vector[String] =
     Await.result(source.via(toFormattedString).runWith(Sink.seq[String]), 
timeout).toVector
 
   val containerId = ContainerId("id")
@@ -293,6 +297,26 @@ class KubernetesContainerTests
   }
 
   /*
+   * LOG FORWARDING
+   */
+  it should "container should maintain lastOffset across calls to forwardLogs" 
in {
+    implicit val kubernetes = new TestKubernetesClientWithInvokerAgent
+    val id = ContainerId("id")
+    val container = new KubernetesContainer(id, ContainerAddress("ip"), 
"127.0.0.1", "docker://foo")
+    val logChunk = 10.kilobytes
+
+    await(container.forwardLogs(logChunk, false, Map.empty, JsObject()))
+    await(container.forwardLogs(42.bytes, false, Map.empty, JsObject()))
+    await(container.forwardLogs(logChunk, false, Map.empty, JsObject()))
+    await(container.forwardLogs(42.bytes, false, Map.empty, JsObject()))
+
+    kubernetes.forwardLogs(0) shouldBe (id, 0)
+    kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes)
+    kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42)
+    kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42)
+  }
+
+  /*
    * LOGS
    */
   it should "read a simple log with sentinel" in {

-- 
To stop receiving notification emails like this one, please contact
markusthoem...@apache.org.

Reply via email to