chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available. URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326054019
########## File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala ########## @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: TransactionId, def causedBySequence: Boolean = cause.isDefined } -object ActivationMessage extends DefaultJsonProtocol { - - def parse(msg: String) = Try(serdes.read(msg.parseJson)) - - private implicit val fqnSerdes = FullyQualifiedEntityName.serdes - implicit val serdes = jsonFormat11(ActivationMessage.apply) -} - /** * Message that is sent from the invoker to the controller after action is completed or after slot is free again for * new actions. */ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Message { override val transid: TransactionId = tid - override def serialize: String = { - AcknowledegmentMessage.serdes.write(this).compactPrint - } + override def serialize: String = AcknowledegmentMessage.serdes.write(this).compactPrint + + /** Pithy descriptor for logging. */ + def name: String + + /** Does message indicate slot is free? */ + def isSlotFree: Option[InvokerInstanceId] + + /** Does message contain a result? */ + def result: Option[Either[ActivationId, WhiskActivation]] + + /** + * Is the acknowledgement for an activation that failed internally? + * For some message, this is not relevant and the result is None. + */ + def isSystemError: Option[Boolean] + + def activationId: ActivationId + + /** Serializes the message to JSON. */ + def toJson: JsValue + + /** + * Converts the message to a more compact form if it cannot cross the message bus as is or some of its details are not necessary. + */ + def shrink: AcknowledegmentMessage } /** - * This message is sent from the invoker to the controller, after the slot of an invoker that has been used by the - * current action, is free again (after log collection) + * This message is sent from an invoker to the controller in situtations when the resource slot and the action + * result are available at the same time, and so the split-phase notification is not necessary. Instead the message + * combines the `CompletionMessage` and `ResultMessage`. The `response` may be an `ActivationId` to allow for failures + * to send the activation result because of event-bus size limitations. */ -case class CompletionMessage(override val transid: TransactionId, - activationId: ActivationId, - isSystemError: Boolean, - invoker: InvokerInstanceId) +case class CombinedCompletionAndResultMessage(override val transid: TransactionId, + response: Either[ActivationId, WhiskActivation], + override val isSystemError: Option[Boolean], + invoker: InvokerInstanceId) extends AcknowledegmentMessage(transid) { - - override def toString = { - activationId.asString - } + override def name = "combined" + override def result = Some(response) + override def isSlotFree = Some(invoker) + override def activationId = response.fold(identity, _.activationId) + override def toJson = CombinedCompletionAndResultMessage.serdes.write(this) + override def shrink = copy(response = response.flatMap(a => Left(a.activationId))) + override def toString = response.fold(identity, _.activationId).asString } -object CompletionMessage extends DefaultJsonProtocol { - def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson)) - implicit val serdes = jsonFormat4(CompletionMessage.apply) +/** + * This message is sent from an invoker to the controller, once the resource slot in the invoker (used by the + * corresponding activation) free again (i.e., after log collection). The `CompletionMessage` is part of a split + * phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the + * `CompletionMessage`. + */ +case class CompletionMessage(override val transid: TransactionId, + override val activationId: ActivationId, + override val isSystemError: Option[Boolean], + invoker: InvokerInstanceId) + extends AcknowledegmentMessage(transid) { + override def name = "completion" + override def result = None + override def isSlotFree = Some(invoker) + override def toJson = CompletionMessage.serdes.write(this) + override def shrink = this + override def toString = activationId.asString } /** - * That message will be sent from the invoker to the controller after action completion if the user wants to have - * the result immediately (blocking activation). - * When adding fields, the serdes of the companion object must be updated also. - * The whisk activation field will have its logs stripped. + * This message is sent from an invoker to the load balancer once an action result is available for blocking actions. + * This is part of a split phase notification, and does not indicate that the slot is available, which is indicated with + * a `CompletionMessage`. Note that activation record will not contain any logs from the action execution, only the result. */ case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation]) extends AcknowledegmentMessage(transid) { + override def name = "result" + override def result = Some(response) + override def isSlotFree = None + override def isSystemError = response.fold(_ => None, a => Some(a.response.isWhiskError)) + override def activationId = response.fold(identity, _.activationId) + override def toJson = ResultMessage.serdes.write(this) + override def shrink = copy(response = response.flatMap(a => Left(a.activationId))) + override def toString = activationId.asString +} - override def toString = { - response.fold(l => l, r => r.activationId).asString - } +object ActivationMessage extends DefaultJsonProtocol { + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + + private implicit val fqnSerdes = FullyQualifiedEntityName.serdes + implicit val serdes = jsonFormat11(ActivationMessage.apply) } -object ResultMessage extends DefaultJsonProtocol { - implicit def eitherResponse = - new JsonFormat[Either[ActivationId, WhiskActivation]] { - def write(either: Either[ActivationId, WhiskActivation]) = either match { - case Right(a) => a.toJson - case Left(b) => b.toJson - } +object CombinedCompletionAndResultMessage extends DefaultJsonProtocol { + def apply(transid: TransactionId, + activation: WhiskActivation, + invoker: InvokerInstanceId): CombinedCompletionAndResultMessage = { + CombinedCompletionAndResultMessage(transid, Right(activation), Some(activation.response.isWhiskError), invoker) + } + implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse + implicit val serdes = jsonFormat4( + CombinedCompletionAndResultMessage + .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: Option[Boolean], _: InvokerInstanceId)) +} - def read(value: JsValue) = value match { - // per the ActivationId's serializer, it is guaranteed to be a String even if it only consists of digits - case _: JsString => Left(value.convertTo[ActivationId]) - case _: JsObject => Right(value.convertTo[WhiskActivation]) - case _ => deserializationError("could not read ResultMessage") - } - } +object CompletionMessage extends DefaultJsonProtocol { + def apply(transid: TransactionId, activation: WhiskActivation, invoker: InvokerInstanceId): CompletionMessage = { + CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), invoker) + } + implicit val serdes = jsonFormat4( + CompletionMessage.apply(_: TransactionId, _: ActivationId, _: Option[Boolean], _: InvokerInstanceId)) +} - def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson)) - implicit val serdes = jsonFormat2(ResultMessage.apply) +object ResultMessage extends DefaultJsonProtocol { + def apply(transid: TransactionId, activation: WhiskActivation): ResultMessage = + ResultMessage(transid, Right(activation)) + implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse + implicit val serdes = jsonFormat2(ResultMessage.apply(_: TransactionId, _: Either[ActivationId, WhiskActivation])) } object AcknowledegmentMessage extends DefaultJsonProtocol { - def parse(msg: String): Try[AcknowledegmentMessage] = { - Try(serdes.read(msg.parseJson)) + def parse(msg: String): Try[AcknowledegmentMessage] = Try(serdes.read(msg.parseJson)) + + protected[connector] val eitherResponse = new JsonFormat[Either[ActivationId, WhiskActivation]] { + def write(either: Either[ActivationId, WhiskActivation]) = either.fold(_.toJson, _.toJson) + + def read(value: JsValue) = value match { + case _: JsString => + // per the ActivationId serializer, an activation id is a String even if it only consists of digits + Left(value.convertTo[ActivationId]) + + case _: JsObject => Right(value.convertTo[WhiskActivation]) + case _ => deserializationError("could not read ResultMessage") + } } implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] { - override def write(obj: AcknowledegmentMessage): JsValue = { - obj match { - case c: CompletionMessage => c.toJson - case r: ResultMessage => r.toJson - } - } + override def write(m: AcknowledegmentMessage): JsValue = m.toJson + // The field invoker is only part of CombinedCompletionAndResultMessage and CompletionMessage. + // If this field is part of the JSON, we try to deserialize into one of these two types, + // and otherwise to a ResultMessage. If all conversions fail, an error will be thrown that needs to be handled. override def read(json: JsValue): AcknowledegmentMessage = { - json.asJsObject - // The field invoker is only part of the CompletionMessage. If this field is part of the JSON, we try to convert - // it to a CompletionMessage. Otherwise to a ResultMessage. - // If both conversions fail, an error will be thrown that needs to be handled. + val obj = json.asJsObject + + obj .getFields("invoker") .headOption - .map(_ => json.convertTo[CompletionMessage]) + .map(_ => { Review comment: We can reduce the size bit further by using `{"t":1}` if needed i.e. encode it as enum. However as overhead is not that big we can keep the more readable string form > refactoring user memory handling such that it's no more serialised in the CompletionMessage - but only in pings. -> `"invoker":{"instance":0,"userMemory":"16384 MB"}` Would be good to have a issue for this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services