chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326054019
 
 

 ##########
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##########
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-    AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
-                             activationId: ActivationId,
-                             isSystemError: Boolean,
-                             invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+                                              response: Either[ActivationId, 
WhiskActivation],
+                                              override val isSystemError: 
Option[Boolean],
+                                              invoker: InvokerInstanceId)
     extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-    activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+                             override val activationId: ActivationId,
+                             override val isSystemError: Option[Boolean],
+                             invoker: InvokerInstanceId)
+    extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking activation).
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * This message is sent from an invoker to the load balancer once an action 
result is available for blocking actions.
+ * This is part of a split phase notification, and does not indicate that the 
slot is available, which is indicated with
+ * a `CompletionMessage`. Note that activation record will not contain any 
logs from the action execution, only the result.
  */
 case class ResultMessage(override val transid: TransactionId, response: 
Either[ActivationId, WhiskActivation])
     extends AcknowledegmentMessage(transid) {
+  override def name = "result"
+  override def result = Some(response)
+  override def isSlotFree = None
+  override def isSystemError = response.fold(_ => None, a => 
Some(a.response.isWhiskError))
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = ResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = activationId.asString
+}
 
-  override def toString = {
-    response.fold(l => l, r => r.activationId).asString
-  }
+object ActivationMessage extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
+  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+  implicit val serdes = jsonFormat11(ActivationMessage.apply)
 }
 
-object ResultMessage extends DefaultJsonProtocol {
-  implicit def eitherResponse =
-    new JsonFormat[Either[ActivationId, WhiskActivation]] {
-      def write(either: Either[ActivationId, WhiskActivation]) = either match {
-        case Right(a) => a.toJson
-        case Left(b)  => b.toJson
-      }
+object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {
+  def apply(transid: TransactionId,
+            activation: WhiskActivation,
+            invoker: InvokerInstanceId): CombinedCompletionAndResultMessage = {
+    CombinedCompletionAndResultMessage(transid, Right(activation), 
Some(activation.response.isWhiskError), invoker)
+  }
+  implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+  implicit val serdes = jsonFormat4(
+    CombinedCompletionAndResultMessage
+      .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: 
Option[Boolean], _: InvokerInstanceId))
+}
 
-      def read(value: JsValue) = value match {
-        // per the ActivationId's serializer, it is guaranteed to be a String 
even if it only consists of digits
-        case _: JsString => Left(value.convertTo[ActivationId])
-        case _: JsObject => Right(value.convertTo[WhiskActivation])
-        case _           => deserializationError("could not read 
ResultMessage")
-      }
-    }
+object CompletionMessage extends DefaultJsonProtocol {
+  def apply(transid: TransactionId, activation: WhiskActivation, invoker: 
InvokerInstanceId): CompletionMessage = {
+    CompletionMessage(transid, activation.activationId, 
Some(activation.response.isWhiskError), invoker)
+  }
+  implicit val serdes = jsonFormat4(
+    CompletionMessage.apply(_: TransactionId, _: ActivationId, _: 
Option[Boolean], _: InvokerInstanceId))
+}
 
-  def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat2(ResultMessage.apply)
+object ResultMessage extends DefaultJsonProtocol {
+  def apply(transid: TransactionId, activation: WhiskActivation): 
ResultMessage =
+    ResultMessage(transid, Right(activation))
+  implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+  implicit val serdes = jsonFormat2(ResultMessage.apply(_: TransactionId, _: 
Either[ActivationId, WhiskActivation]))
 }
 
 object AcknowledegmentMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[AcknowledegmentMessage] = {
-    Try(serdes.read(msg.parseJson))
+  def parse(msg: String): Try[AcknowledegmentMessage] = 
Try(serdes.read(msg.parseJson))
+
+  protected[connector] val eitherResponse = new 
JsonFormat[Either[ActivationId, WhiskActivation]] {
+    def write(either: Either[ActivationId, WhiskActivation]) = 
either.fold(_.toJson, _.toJson)
+
+    def read(value: JsValue) = value match {
+      case _: JsString =>
+        // per the ActivationId serializer, an activation id is a String even 
if it only consists of digits
+        Left(value.convertTo[ActivationId])
+
+      case _: JsObject => Right(value.convertTo[WhiskActivation])
+      case _           => deserializationError("could not read ResultMessage")
+    }
   }
 
   implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
-    override def write(obj: AcknowledegmentMessage): JsValue = {
-      obj match {
-        case c: CompletionMessage => c.toJson
-        case r: ResultMessage     => r.toJson
-      }
-    }
+    override def write(m: AcknowledegmentMessage): JsValue = m.toJson
 
+    // The field invoker is only part of CombinedCompletionAndResultMessage 
and CompletionMessage.
+    // If this field is part of the JSON, we try to deserialize into one of 
these two types,
+    // and otherwise to a ResultMessage. If all conversions fail, an error 
will be thrown that needs to be handled.
     override def read(json: JsValue): AcknowledegmentMessage = {
-      json.asJsObject
-      // The field invoker is only part of the CompletionMessage. If this 
field is part of the JSON, we try to convert
-      // it to a CompletionMessage. Otherwise to a ResultMessage.
-      // If both conversions fail, an error will be thrown that needs to be 
handled.
+      val obj = json.asJsObject
+
+      obj
         .getFields("invoker")
         .headOption
-        .map(_ => json.convertTo[CompletionMessage])
+        .map(_ => {
 
 Review comment:
   We can reduce the size bit further by using `{"t":1}` if needed i.e. encode 
it as enum. However as overhead is not that big we can keep the more readable 
string form
   
   > refactoring user memory handling such that it's no more serialised in the 
CompletionMessage - but only in pings. -> 
`"invoker":{"instance":0,"userMemory":"16384 MB"}`
   
   Would be good to have a issue for this.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to