diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 55e5a64e62..319b0b5661 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -48,6 +48,7 @@ case class ActivationMessage(override val transid: 
TransactionId,
                              activationId: ActivationId,
                              rootControllerIndex: ControllerInstanceId,
                              blocking: Boolean,
+                             blockingLogs: Boolean,
                              content: Option[JsObject],
                              cause: Option[ActivationId] = None,
                              traceContext: Option[Map[String, String]] = None)
@@ -68,7 +69,7 @@ object ActivationMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
 
   private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat10(ActivationMessage.apply)
+  implicit val serdes = jsonFormat11(ActivationMessage.apply)
 }
 
 /**
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala 
b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index bd92c581eb..35f3f1db71 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -215,34 +215,43 @@ trait WhiskActionsApi extends WhiskCollectionAPI with 
PostActionActivation with
     implicit transid: TransactionId) = {
     parameter(
       'blocking ? false,
+      'logs ? false,
       'result ? false,
-      'timeout.as[FiniteDuration] ? 
WhiskActionsApi.maxWaitForBlockingActivation) { (blocking, result, 
waitOverride) =>
-      entity(as[Option[JsObject]]) { payload =>
-        getEntity(WhiskActionMetaData.get(entityStore, entityName.toDocId), 
Some {
-          act: WhiskActionMetaData =>
-            // resolve the action --- special case for sequences that may 
contain components with '_' as default package
-            val action = act.resolve(user.namespace)
-            onComplete(entitleReferencedEntitiesMetaData(user, 
Privilege.ACTIVATE, Some(action.exec))) {
-              case Success(_) =>
-                val actionWithMergedParams = env.map(action.inherit(_)) 
getOrElse action
-
-                // incoming parameters may not override final parameters 
(i.e., parameters with already defined values)
-                // on an action once its parameters are resolved across 
package and binding
-                val allowInvoke = payload
-                  .map(_.fields.keySet.forall(key => 
!actionWithMergedParams.immutableParameters.contains(key)))
-                  .getOrElse(true)
-
-                if (allowInvoke) {
-                  doInvoke(user, actionWithMergedParams, payload, blocking, 
waitOverride, result)
-                } else {
-                  terminate(BadRequest, Messages.parametersNotAllowed)
-                }
-
-              case Failure(f) =>
-                super.handleEntitlementFailure(f)
-            }
-        })
-      }
+      'timeout.as[FiniteDuration] ? 
WhiskActionsApi.maxWaitForBlockingActivation) {
+      (blocking, logs, result, waitOverride) =>
+        entity(as[Option[JsObject]]) { payload =>
+          getEntity(WhiskActionMetaData.get(entityStore, entityName.toDocId), 
Some {
+            act: WhiskActionMetaData =>
+              // resolve the action --- special case for sequences that may 
contain components with '_' as default package
+              val action = act.resolve(user.namespace)
+              onComplete(entitleReferencedEntitiesMetaData(user, 
Privilege.ACTIVATE, Some(action.exec))) {
+                case Success(_) =>
+                  val actionWithMergedParams = env.map(action.inherit(_)) 
getOrElse action
+
+                  // incoming parameters may not override final parameters 
(i.e., parameters with already defined values)
+                  // on an action once its parameters are resolved across 
package and binding
+                  val allowInvoke = payload
+                    .map(_.fields.keySet.forall(key => 
!actionWithMergedParams.immutableParameters.contains(key)))
+                    .getOrElse(true)
+
+                  if (allowInvoke) {
+                    doInvoke(
+                      user,
+                      actionWithMergedParams,
+                      payload,
+                      blocking,
+                      waitOverride,
+                      blocking && logs && !result,
+                      result)
+                  } else {
+                    terminate(BadRequest, Messages.parametersNotAllowed)
+                  }
+
+                case Failure(f) =>
+                  super.handleEntitlementFailure(f)
+              }
+          })
+        }
     }
   }
 
@@ -251,9 +260,10 @@ trait WhiskActionsApi extends WhiskCollectionAPI with 
PostActionActivation with
                        payload: Option[JsObject],
                        blocking: Boolean,
                        waitOverride: FiniteDuration,
+                       blockingLogs: Boolean,
                        result: Boolean)(implicit transid: TransactionId): 
RequestContext => Future[RouteResult] = {
     val waitForResponse = if (blocking) Some(waitOverride) else None
-    onComplete(invokeAction(user, actionWithMergedParams, payload, 
waitForResponse, cause = None)) {
+    onComplete(invokeAction(user, actionWithMergedParams, payload, 
waitForResponse, blockingLogs, cause = None)) {
       case Success(Left(activationId)) =>
         // non-blocking invoke or blocking invoke which got queued instead
         respondWithActivationIdHeader(activationId) {
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala 
b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index 6f526575f2..7e6716e2ff 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -629,7 +629,13 @@ trait WhiskWebActionsApi extends Directives with 
ValidateRequestSize with PostAc
       if (isRawHttpAction || context
             .overrides(webApiDirectives.reservedProperties ++ 
action.immutableParameters)) {
         val content = context.toActionArgument(onBehalfOf, isRawHttpAction)
-        invokeAction(actionOwnerIdentity, action, Some(JsObject(content)), 
maxWaitForWebActionResult, cause = None)
+        invokeAction(
+          actionOwnerIdentity,
+          action,
+          Some(JsObject(content)),
+          maxWaitForWebActionResult,
+          false,
+          cause = None)
       } else {
         Future.failed(RejectRequest(BadRequest, Messages.parametersNotAllowed))
       }
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
index 5548229ca8..eb8828cd5e 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
@@ -49,6 +49,7 @@ protected[core] trait PostActionActivation extends 
PrimitiveActions with Sequenc
     action: WhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
+    responseWithLogs: Boolean,
     cause: Option[ActivationId])(implicit transid: TransactionId): 
Future[Either[ActivationId, WhiskActivation]] = {
     action.toExecutableWhiskAction match {
       // this is a topmost sequence
@@ -57,7 +58,7 @@ protected[core] trait PostActionActivation extends 
PrimitiveActions with Sequenc
         invokeSequence(user, action, components, payload, waitForResponse, 
cause, topmost = true, 0).map(r => r._1)
       // a non-deprecated ExecutableWhiskAction
       case Some(executable) if !executable.exec.deprecated =>
-        invokeSingleAction(user, executable, payload, waitForResponse, cause)
+        invokeSingleAction(user, executable, payload, waitForResponse, 
responseWithLogs, cause)
       // a deprecated exec
       case _ =>
         Future.failed(RejectRequest(BadRequest, 
Messages.runtimeDeprecated(action.exec)))
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index d0d4f60c50..089481e502 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -102,12 +102,13 @@ protected[actions] trait PrimitiveActions {
     action: ExecutableWhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
+    responseWithLogs: Boolean,
     cause: Option[ActivationId])(implicit transid: TransactionId): 
Future[Either[ActivationId, WhiskActivation]] = {
 
     if (action.annotations.isTruthy(WhiskActivation.conductorAnnotation)) {
-      invokeComposition(user, action, payload, waitForResponse, cause)
+      invokeComposition(user, action, payload, waitForResponse, 
responseWithLogs, cause)
     } else {
-      invokeSimpleAction(user, action, payload, waitForResponse, cause)
+      invokeSimpleAction(user, action, payload, waitForResponse, 
responseWithLogs, cause)
     }
   }
 
@@ -145,6 +146,7 @@ protected[actions] trait PrimitiveActions {
     action: ExecutableWhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
+    responseWithLogs: Boolean,
     cause: Option[ActivationId])(implicit transid: TransactionId): 
Future[Either[ActivationId, WhiskActivation]] = {
 
     // merge package parameters with action (action parameters supersede), 
then merge in payload
@@ -168,6 +170,7 @@ protected[actions] trait PrimitiveActions {
       activationId, // activation id created here
       activeAckTopicIndex,
       waitForResponse.isDefined,
+      responseWithLogs,
       args,
       cause = cause,
       WhiskTracerProvider.tracer.getTraceContext(transid))
@@ -261,6 +264,7 @@ protected[actions] trait PrimitiveActions {
                                 action: ExecutableWhiskActionMetaData,
                                 payload: Option[JsObject],
                                 waitForResponse: Option[FiniteDuration],
+                                responseWithLogs: Boolean,
                                 cause: Option[ActivationId],
                                 accounting: Option[CompositionAccounting] = 
None)(
     implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
@@ -325,6 +329,7 @@ protected[actions] trait PrimitiveActions {
           action = session.action,
           payload = params,
           waitForResponse = Some(session.action.limits.timeout.duration + 
1.minute), // wait for result
+          false,
           cause = Some(session.activationId)) // cause is session id
 
       waitForActivation(user, session, activationResponse).flatMap {
@@ -440,6 +445,7 @@ protected[actions] trait PrimitiveActions {
           action,
           payload,
           waitForResponse = None, // not topmost, hence blocking, no need for 
timeout
+          false,
           cause = Some(session.activationId),
           accounting = Some(session.accounting))
       case Some(action) => // primitive action
@@ -449,6 +455,7 @@ protected[actions] trait PrimitiveActions {
           action,
           payload,
           waitForResponse = Some(action.limits.timeout.duration + 1.minute),
+          false,
           cause = Some(session.activationId))
       case None => // sequence
         session.accounting.components += 1
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index d38a91ad1c..a54043cf93 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -66,6 +66,7 @@ protected[actions] trait SequenceActions {
     action: WhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
+    responseWithLogs: Boolean,
     cause: Option[ActivationId])(implicit transid: TransactionId): 
Future[Either[ActivationId, WhiskActivation]]
 
   /**
@@ -337,7 +338,7 @@ protected[actions] trait SequenceActions {
           // this is an invoke for an atomic action
           logging.debug(this, s"sequence invoking an enclosed atomic action 
$action")
           val timeout = action.limits.timeout.duration + 1.minute
-          invokeAction(user, action, inputPayload, waitForResponse = 
Some(timeout), cause) map {
+          invokeAction(user, action, inputPayload, waitForResponse = 
Some(timeout), false, cause) map {
             case res => (res, accounting.atomicActionCnt + 1)
           }
       }
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 18f5ac303b..9f8018ff3c 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -408,6 +408,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, 
controllerInstance: Contr
         activationId = new ActivationIdGenerator {}.make(),
         rootControllerIndex = controllerInstance,
         blocking = false,
+        blockingLogs = false,
         content = None)
 
       context.parent ! ActivationRequest(activationMessage, invokerInstance)
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 0ddd666d92..979984b587 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and 
could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID) => Future[Any],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, Boolean, 
ControllerInstanceId, UUID) => Future[Any],
   storeActivation: (TransactionId, WhiskActivation, UserContext) => 
Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InvokerInstanceId,
@@ -167,6 +167,7 @@ class ContainerProxy(
               transid,
               activation,
               job.msg.blocking,
+              false,
               job.msg.rootControllerIndex,
               job.msg.user.namespace.uuid)
             storeActivation(transid, activation, context)
@@ -390,8 +391,10 @@ class ContainerProxy(
       }
 
     // Sending active ack. Entirely asynchronous and not waited upon.
-    activation.foreach(
-      sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, 
job.msg.user.namespace.uuid))
+    if (!job.msg.blockingLogs) {
+      activation.foreach(
+        sendActiveAck(tid, _, job.msg.blocking, false, 
job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
+    }
 
     val context = UserContext(job.msg.user)
 
@@ -418,8 +421,13 @@ class ContainerProxy(
         }
       }
 
-    // Storing the record. Entirely asynchronous and not waited upon.
-    activationWithLogs.map(_.fold(_.activation, 
identity)).foreach(storeActivation(tid, _, context))
+    // Sending active ack and storing the record. Entirely asynchronous and 
not waited upon.
+    activationWithLogs.map(_.fold(_.activation, identity)).foreach { act =>
+      if (job.msg.blockingLogs) {
+        sendActiveAck(tid, act, job.msg.blocking, true, 
job.msg.rootControllerIndex, job.msg.user.namespace.uuid)
+      }
+      storeActivation(tid, act, context)
+    }
 
     // Disambiguate activation errors and transform the Either into a 
failed/successful Future respectively.
     activationWithLogs.flatMap {
@@ -435,7 +443,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: 
FiniteDuration, paus
 object ContainerProxy {
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) 
=> Future[Any],
+    ack: (TransactionId, WhiskActivation, Boolean, Boolean, 
ControllerInstanceId, UUID) => Future[Any],
     store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InvokerInstanceId,
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 5f4fd8db48..3f3f065d98 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -117,6 +117,7 @@ class InvokerReactive(
   private val ack = (tid: TransactionId,
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
+                     responseWithLogs: Boolean,
                      controllerInstance: ControllerInstanceId,
                      userId: UUID) => {
     implicit val transid: TransactionId = tid
@@ -130,6 +131,30 @@ class InvokerReactive(
             s"posted ${if (recovery) "recovery" else "completion"} of 
activation ${activationResult.activationId}")
       }
     }
+
+    def getTruncatedLogs(logs: ActivationLogs) = {
+      var totalLogSize = 0
+      val maxLogSize = 1024 * 4
+
+      ActivationLogs(
+        logs.logs.reverse
+          .map(log =>
+            if (totalLogSize < maxLogSize) {
+              if (log.size + totalLogSize <= maxLogSize) {
+                totalLogSize = totalLogSize + log.size
+                log
+              } else {
+                val l = s"...${log.substring(log.size - (maxLogSize - 
totalLogSize))}"
+                totalLogSize = maxLogSize
+                l
+              }
+            } else {
+              ""
+          })
+          .filter(_.nonEmpty)
+          .reverse)
+    }
+
     // Potentially sends activation metadata to kafka if user events are 
enabled
     UserEvents.send(
       producer, {
@@ -155,7 +180,17 @@ class InvokerReactive(
           activation.typeName)
       })
 
-    send(Right(if (blockingInvoke) activationResult else 
activationResult.withoutLogsOrResult)).recoverWith {
+    val act = if (blockingInvoke) {
+      if (responseWithLogs) {
+        activationResult.withLogs(getTruncatedLogs(activationResult.logs))
+      } else {
+        activationResult.withoutLogs
+      }
+    } else {
+      activationResult.withoutLogsOrResult
+    }
+
+    send(Right(act)).recoverWith {
       case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
         send(Left(activationResult.activationId), recovery = true)
     }
@@ -241,7 +276,7 @@ class InvokerReactive(
                 val context = UserContext(msg.user)
                 val activation = generateFallbackActivation(msg, response)
                 activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex, msg.user.namespace.uuid)
+                ack(msg.transid, activation, msg.blocking, false, 
msg.rootControllerIndex, msg.user.namespace.uuid)
                 store(msg.transid, activation, context)
                 Future.successful(())
             }
@@ -251,7 +286,7 @@ class InvokerReactive(
           activationFeed ! MessageFeed.Processed
           val activation =
             generateFallbackActivation(msg, 
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
-          ack(msg.transid, activation, false, msg.rootControllerIndex, 
msg.user.namespace.uuid)
+          ack(msg.transid, activation, false, false, msg.rootControllerIndex, 
msg.user.namespace.uuid)
           logging.warn(this, s"namespace ${msg.user.namespace.name} was 
blocked in invoker.")
           Future.successful(())
         }
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 78317ad196..bc956dfb92 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -80,6 +80,7 @@ class ContainerPoolTests
       ActivationId.generate(),
       ControllerInstanceId("0"),
       blocking = false,
+      blockingLogs = false,
       content = None)
     Run(action, message)
   }
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 4a2a1331fc..f43188aed7 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -90,6 +90,7 @@ class ContainerProxyTests
     ActivationId.generate(),
     ControllerInstanceId("0"),
     blocking = false,
+    blockingLogs = false,
     content = None)
 
   /*
@@ -141,7 +142,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable version of the ack method, which records all 
calls in a buffer */
   def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: 
ControllerInstanceId, _: UUID) =>
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: Boolean, _: 
ControllerInstanceId, _: UUID) =>
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe 
Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala 
b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index a0acf185c7..5b18d0baa0 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -261,6 +261,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon 
with BeforeAndAfterEac
     action: WhiskActionMetaData,
     payload: Option[JsObject],
     waitForResponse: Option[FiniteDuration],
+    responseWithLogs: Boolean,
     cause: Option[ActivationId])(implicit transid: TransactionId): 
Future[Either[ActivationId, WhiskActivation]] = {
     invocationCount = invocationCount + 1
 
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index c8a8c0183f..31425fe2e7 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -194,6 +194,7 @@ class InvokerSupervisionTests
       activationId = new ActivationIdGenerator {}.make(),
       rootControllerIndex = ControllerInstanceId("0"),
       blocking = false,
+      blockingLogs = false,
       content = None)
     val msg = ActivationRequest(activationMessage, invokerInstance)
 


With regards,
Apache Git Services

Reply via email to