chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326205999
 
 

 ##########
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##########
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-    AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
-                             activationId: ActivationId,
-                             isSystemError: Boolean,
-                             invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+                                              response: Either[ActivationId, 
WhiskActivation],
+                                              override val isSystemError: 
Option[Boolean],
+                                              invoker: InvokerInstanceId)
     extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-    activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+                             override val activationId: ActivationId,
+                             override val isSystemError: Option[Boolean],
+                             invoker: InvokerInstanceId)
+    extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking activation).
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * This message is sent from an invoker to the load balancer once an action 
result is available for blocking actions.
+ * This is part of a split phase notification, and does not indicate that the 
slot is available, which is indicated with
+ * a `CompletionMessage`. Note that activation record will not contain any 
logs from the action execution, only the result.
  */
 case class ResultMessage(override val transid: TransactionId, response: 
Either[ActivationId, WhiskActivation])
 
 Review comment:
   Thats the Right thing to do !!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to