diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index 55e5a64e62..319b0b5661 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -48,6 +48,7 @@ case class ActivationMessage(override val transid: TransactionId, activationId: ActivationId, rootControllerIndex: ControllerInstanceId, blocking: Boolean, + blockingLogs: Boolean, content: Option[JsObject], cause: Option[ActivationId] = None, traceContext: Option[Map[String, String]] = None) @@ -68,7 +69,7 @@ object ActivationMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) private implicit val fqnSerdes = FullyQualifiedEntityName.serdes - implicit val serdes = jsonFormat10(ActivationMessage.apply) + implicit val serdes = jsonFormat11(ActivationMessage.apply) } /** diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala index bd92c581eb..35f3f1db71 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala @@ -215,34 +215,43 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with implicit transid: TransactionId) = { parameter( 'blocking ? false, + 'logs ? false, 'result ? false, - 'timeout.as[FiniteDuration] ? WhiskActionsApi.maxWaitForBlockingActivation) { (blocking, result, waitOverride) => - entity(as[Option[JsObject]]) { payload => - getEntity(WhiskActionMetaData.get(entityStore, entityName.toDocId), Some { - act: WhiskActionMetaData => - // resolve the action --- special case for sequences that may contain components with '_' as default package - val action = act.resolve(user.namespace) - onComplete(entitleReferencedEntitiesMetaData(user, Privilege.ACTIVATE, Some(action.exec))) { - case Success(_) => - val actionWithMergedParams = env.map(action.inherit(_)) getOrElse action - - // incoming parameters may not override final parameters (i.e., parameters with already defined values) - // on an action once its parameters are resolved across package and binding - val allowInvoke = payload - .map(_.fields.keySet.forall(key => !actionWithMergedParams.immutableParameters.contains(key))) - .getOrElse(true) - - if (allowInvoke) { - doInvoke(user, actionWithMergedParams, payload, blocking, waitOverride, result) - } else { - terminate(BadRequest, Messages.parametersNotAllowed) - } - - case Failure(f) => - super.handleEntitlementFailure(f) - } - }) - } + 'timeout.as[FiniteDuration] ? WhiskActionsApi.maxWaitForBlockingActivation) { + (blocking, logs, result, waitOverride) => + entity(as[Option[JsObject]]) { payload => + getEntity(WhiskActionMetaData.get(entityStore, entityName.toDocId), Some { + act: WhiskActionMetaData => + // resolve the action --- special case for sequences that may contain components with '_' as default package + val action = act.resolve(user.namespace) + onComplete(entitleReferencedEntitiesMetaData(user, Privilege.ACTIVATE, Some(action.exec))) { + case Success(_) => + val actionWithMergedParams = env.map(action.inherit(_)) getOrElse action + + // incoming parameters may not override final parameters (i.e., parameters with already defined values) + // on an action once its parameters are resolved across package and binding + val allowInvoke = payload + .map(_.fields.keySet.forall(key => !actionWithMergedParams.immutableParameters.contains(key))) + .getOrElse(true) + + if (allowInvoke) { + doInvoke( + user, + actionWithMergedParams, + payload, + blocking, + waitOverride, + blocking && logs && !result, + result) + } else { + terminate(BadRequest, Messages.parametersNotAllowed) + } + + case Failure(f) => + super.handleEntitlementFailure(f) + } + }) + } } } @@ -251,9 +260,10 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with payload: Option[JsObject], blocking: Boolean, waitOverride: FiniteDuration, + blockingLogs: Boolean, result: Boolean)(implicit transid: TransactionId): RequestContext => Future[RouteResult] = { val waitForResponse = if (blocking) Some(waitOverride) else None - onComplete(invokeAction(user, actionWithMergedParams, payload, waitForResponse, cause = None)) { + onComplete(invokeAction(user, actionWithMergedParams, payload, waitForResponse, blockingLogs, cause = None)) { case Success(Left(activationId)) => // non-blocking invoke or blocking invoke which got queued instead respondWithActivationIdHeader(activationId) { diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala index 6f526575f2..7e6716e2ff 100644 --- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala @@ -629,7 +629,13 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc if (isRawHttpAction || context .overrides(webApiDirectives.reservedProperties ++ action.immutableParameters)) { val content = context.toActionArgument(onBehalfOf, isRawHttpAction) - invokeAction(actionOwnerIdentity, action, Some(JsObject(content)), maxWaitForWebActionResult, cause = None) + invokeAction( + actionOwnerIdentity, + action, + Some(JsObject(content)), + maxWaitForWebActionResult, + false, + cause = None) } else { Future.failed(RejectRequest(BadRequest, Messages.parametersNotAllowed)) } diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala index 5548229ca8..eb8828cd5e 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala @@ -49,6 +49,7 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc action: WhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], + responseWithLogs: Boolean, cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { action.toExecutableWhiskAction match { // this is a topmost sequence @@ -57,7 +58,7 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc invokeSequence(user, action, components, payload, waitForResponse, cause, topmost = true, 0).map(r => r._1) // a non-deprecated ExecutableWhiskAction case Some(executable) if !executable.exec.deprecated => - invokeSingleAction(user, executable, payload, waitForResponse, cause) + invokeSingleAction(user, executable, payload, waitForResponse, responseWithLogs, cause) // a deprecated exec case _ => Future.failed(RejectRequest(BadRequest, Messages.runtimeDeprecated(action.exec))) diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index d0d4f60c50..089481e502 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -102,12 +102,13 @@ protected[actions] trait PrimitiveActions { action: ExecutableWhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], + responseWithLogs: Boolean, cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { if (action.annotations.isTruthy(WhiskActivation.conductorAnnotation)) { - invokeComposition(user, action, payload, waitForResponse, cause) + invokeComposition(user, action, payload, waitForResponse, responseWithLogs, cause) } else { - invokeSimpleAction(user, action, payload, waitForResponse, cause) + invokeSimpleAction(user, action, payload, waitForResponse, responseWithLogs, cause) } } @@ -145,6 +146,7 @@ protected[actions] trait PrimitiveActions { action: ExecutableWhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], + responseWithLogs: Boolean, cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { // merge package parameters with action (action parameters supersede), then merge in payload @@ -168,6 +170,7 @@ protected[actions] trait PrimitiveActions { activationId, // activation id created here activeAckTopicIndex, waitForResponse.isDefined, + responseWithLogs, args, cause = cause, WhiskTracerProvider.tracer.getTraceContext(transid)) @@ -261,6 +264,7 @@ protected[actions] trait PrimitiveActions { action: ExecutableWhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], + responseWithLogs: Boolean, cause: Option[ActivationId], accounting: Option[CompositionAccounting] = None)( implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { @@ -325,6 +329,7 @@ protected[actions] trait PrimitiveActions { action = session.action, payload = params, waitForResponse = Some(session.action.limits.timeout.duration + 1.minute), // wait for result + false, cause = Some(session.activationId)) // cause is session id waitForActivation(user, session, activationResponse).flatMap { @@ -440,6 +445,7 @@ protected[actions] trait PrimitiveActions { action, payload, waitForResponse = None, // not topmost, hence blocking, no need for timeout + false, cause = Some(session.activationId), accounting = Some(session.accounting)) case Some(action) => // primitive action @@ -449,6 +455,7 @@ protected[actions] trait PrimitiveActions { action, payload, waitForResponse = Some(action.limits.timeout.duration + 1.minute), + false, cause = Some(session.activationId)) case None => // sequence session.accounting.components += 1 diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala index d38a91ad1c..a54043cf93 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala @@ -66,6 +66,7 @@ protected[actions] trait SequenceActions { action: WhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], + responseWithLogs: Boolean, cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] /** @@ -337,7 +338,7 @@ protected[actions] trait SequenceActions { // this is an invoke for an atomic action logging.debug(this, s"sequence invoking an enclosed atomic action $action") val timeout = action.limits.timeout.duration + 1.minute - invokeAction(user, action, inputPayload, waitForResponse = Some(timeout), cause) map { + invokeAction(user, action, inputPayload, waitForResponse = Some(timeout), false, cause) map { case res => (res, accounting.atomicActionCnt + 1) } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 18f5ac303b..9f8018ff3c 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -408,6 +408,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr activationId = new ActivationIdGenerator {}.make(), rootControllerIndex = controllerInstance, blocking = false, + blockingLogs = false, content = None) context.parent ! ActivationRequest(activationMessage, invokerInstance) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index 0ddd666d92..979984b587 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], + sendActiveAck: (TransactionId, WhiskActivation, Boolean, Boolean, ControllerInstanceId, UUID) => Future[Any], storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, @@ -167,6 +167,7 @@ class ContainerProxy( transid, activation, job.msg.blocking, + false, job.msg.rootControllerIndex, job.msg.user.namespace.uuid) storeActivation(transid, activation, context) @@ -390,8 +391,10 @@ class ContainerProxy( } // Sending active ack. Entirely asynchronous and not waited upon. - activation.foreach( - sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid)) + if (!job.msg.blockingLogs) { + activation.foreach( + sendActiveAck(tid, _, job.msg.blocking, false, job.msg.rootControllerIndex, job.msg.user.namespace.uuid)) + } val context = UserContext(job.msg.user) @@ -418,8 +421,13 @@ class ContainerProxy( } } - // Storing the record. Entirely asynchronous and not waited upon. - activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context)) + // Sending active ack and storing the record. Entirely asynchronous and not waited upon. + activationWithLogs.map(_.fold(_.activation, identity)).foreach { act => + if (job.msg.blockingLogs) { + sendActiveAck(tid, act, job.msg.blocking, true, job.msg.rootControllerIndex, job.msg.user.namespace.uuid) + } + storeActivation(tid, act, context) + } // Disambiguate activation errors and transform the Either into a failed/successful Future respectively. activationWithLogs.flatMap { @@ -435,7 +443,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus object ContainerProxy { def props( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], + ack: (TransactionId, WhiskActivation, Boolean, Boolean, ControllerInstanceId, UUID) => Future[Any], store: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 5f4fd8db48..3f3f065d98 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -117,6 +117,7 @@ class InvokerReactive( private val ack = (tid: TransactionId, activationResult: WhiskActivation, blockingInvoke: Boolean, + responseWithLogs: Boolean, controllerInstance: ControllerInstanceId, userId: UUID) => { implicit val transid: TransactionId = tid @@ -130,6 +131,30 @@ class InvokerReactive( s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}") } } + + def getTruncatedLogs(logs: ActivationLogs) = { + var totalLogSize = 0 + val maxLogSize = 1024 * 4 + + ActivationLogs( + logs.logs.reverse + .map(log => + if (totalLogSize < maxLogSize) { + if (log.size + totalLogSize <= maxLogSize) { + totalLogSize = totalLogSize + log.size + log + } else { + val l = s"...${log.substring(log.size - (maxLogSize - totalLogSize))}" + totalLogSize = maxLogSize + l + } + } else { + "" + }) + .filter(_.nonEmpty) + .reverse) + } + // Potentially sends activation metadata to kafka if user events are enabled UserEvents.send( producer, { @@ -155,7 +180,17 @@ class InvokerReactive( activation.typeName) }) - send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith { + val act = if (blockingInvoke) { + if (responseWithLogs) { + activationResult.withLogs(getTruncatedLogs(activationResult.logs)) + } else { + activationResult.withoutLogs + } + } else { + activationResult.withoutLogsOrResult + } + + send(Right(act)).recoverWith { case t if t.getCause.isInstanceOf[RecordTooLargeException] => send(Left(activationResult.activationId), recovery = true) } @@ -241,7 +276,7 @@ class InvokerReactive( val context = UserContext(msg.user) val activation = generateFallbackActivation(msg, response) activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid) + ack(msg.transid, activation, msg.blocking, false, msg.rootControllerIndex, msg.user.namespace.uuid) store(msg.transid, activation, context) Future.successful(()) } @@ -251,7 +286,7 @@ class InvokerReactive( activationFeed ! MessageFeed.Processed val activation = generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid) + ack(msg.transid, activation, false, false, msg.rootControllerIndex, msg.user.namespace.uuid) logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") Future.successful(()) } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index 78317ad196..bc956dfb92 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -80,6 +80,7 @@ class ContainerPoolTests ActivationId.generate(), ControllerInstanceId("0"), blocking = false, + blockingLogs = false, content = None) Run(action, message) } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 4a2a1331fc..f43188aed7 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -90,6 +90,7 @@ class ContainerProxyTests ActivationId.generate(), ControllerInstanceId("0"), blocking = false, + blockingLogs = false, content = None) /* @@ -141,7 +142,7 @@ class ContainerProxyTests /** Creates an inspectable version of the ack method, which records all calls in a buffer */ def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction { - (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) => + (_: TransactionId, activation: WhiskActivation, _: Boolean, _: Boolean, _: ControllerInstanceId, _: UUID) => activation.annotations.get("limits") shouldBe Some(a.limits.toJson) activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson) diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala index a0acf185c7..5b18d0baa0 100644 --- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala @@ -261,6 +261,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac action: WhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], + responseWithLogs: Boolean, cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { invocationCount = invocationCount + 1 diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala index c8a8c0183f..31425fe2e7 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -194,6 +194,7 @@ class InvokerSupervisionTests activationId = new ActivationIdGenerator {}.make(), rootControllerIndex = ControllerInstanceId("0"), blocking = false, + blockingLogs = false, content = None) val msg = ActivationRequest(activationMessage, invokerInstance)
With regards, Apache Git Services