This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e3c7a13  Move the ack implementations to the common package. (#4837)
e3c7a13 is described below

commit e3c7a13b11bb2296697ae4c0b2697eb46937702e
Author: 김건희 <kimkh6...@gmail.com>
AuthorDate: Thu Feb 27 12:45:00 2020 +0900

    Move the ack implementations to the common package. (#4837)
    
    The ack logic that responds to kafka messages to the controller can be used 
by other components in the downstream. So move ack logic that is only available 
in invoker package to common package.
---
 .../scala/org/apache/openwhisk/core/ack/Ack.scala  | 55 ++++++++++++++++++++++
 .../openwhisk/core/ack}/MessagingActiveAck.scala   | 20 ++------
 .../apache/openwhisk/core/entity/InstanceId.scala  | 39 +++++++++++----
 .../core/containerpool/ContainerProxy.scala        |  8 +++-
 .../apache/openwhisk/core/invoker/Invoker.scala    | 29 ++++++++++--
 .../openwhisk/core/invoker/InvokerReactive.scala   | 50 +-------------------
 .../openwhisk/core/invoker/LogStoreCollector.scala |  2 +-
 .../containerpool/test/ContainerProxyTests.scala   |  8 ++--
 8 files changed, 128 insertions(+), 83 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/Ack.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/Ack.scala
new file mode 100644
index 0000000..6509602
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/Ack.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.ack
+import org.apache.openwhisk.common.{TransactionId, UserEvents}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, 
EventMessage, MessageProducer}
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, 
WhiskActivation}
+
+import scala.concurrent.Future
+
+/**
+ * A method for sending Active Acknowledgements (aka "active ack") messages to 
the load balancer. These messages
+ * are either completion messages for an activation to indicate a resource 
slot is free, or result-forwarding
+ * messages for continuations (e.g., sequences and conductor actions).
+ *
+ * The activation result is always provided because some acknowledegment 
messages may not carry the result of
+ * the activation and this is needed for sending user events.
+ *
+ * @param tid the transaction id for the activation
+ * @param activationResult is the activation result
+ * @param blockingInvoke is true iff the activation was a blocking request
+ * @param controllerInstance the originating controller/loadbalancer id
+ * @param userId is the UUID for the namespace owning the activation
+ * @param acknowledegment the acknowledgement message to send
+ */
+trait ActiveAck {
+  def apply(tid: TransactionId,
+            activationResult: WhiskActivation,
+            blockingInvoke: Boolean,
+            controllerInstance: ControllerInstanceId,
+            userId: UUID,
+            acknowledegment: AcknowledegmentMessage): Future[Any]
+}
+
+trait EventSender {
+  def send(msg: => EventMessage): Unit
+}
+
+class UserEventSender(producer: MessageProducer) extends EventSender {
+  override def send(msg: => EventMessage): Unit = UserEvents.send(producer, 
msg)
+}
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/MessagingActiveAck.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
similarity index 81%
rename from 
core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/MessagingActiveAck.scala
rename to 
common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
index 67ac836..eb9cce9 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/MessagingActiveAck.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
@@ -15,30 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.openwhisk.core.invoker
+package org.apache.openwhisk.core.ack
 
 import org.apache.kafka.common.errors.RecordTooLargeException
-import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
+import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, 
EventMessage, MessageProducer}
-import org.apache.openwhisk.core.entity.{ControllerInstanceId, 
InvokerInstanceId, UUID, WhiskActivation}
-import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
+import org.apache.openwhisk.core.entity._
 
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
-trait EventSender {
-  def send(msg: => EventMessage): Unit
-}
-
-class UserEventSender(producer: MessageProducer) extends EventSender {
-  override def send(msg: => EventMessage): Unit = UserEvents.send(producer, 
msg)
-}
-
-class MessagingActiveAck(producer: MessageProducer, instance: 
InvokerInstanceId, eventSender: Option[EventSender])(
+class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, 
eventSender: Option[EventSender])(
   implicit logging: Logging,
   ec: ExecutionContext)
     extends ActiveAck {
-  private val source = s"invoker${instance.instance}"
   override def apply(tid: TransactionId,
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
@@ -58,7 +48,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: 
InvokerInstanceId,
     // UserMetrics are sent, when the slot is free again. This ensures, that 
all metrics are sent.
     if (acknowledegment.isSlotFree.nonEmpty) {
       eventSender.foreach { s =>
-        EventMessage.from(activationResult, source, userId) match {
+        EventMessage.from(activationResult, instance.source, userId) match {
           case Success(msg) => s.send(msg)
           case Failure(t)   => logging.error(this, s"activation event was not 
sent: $t")
         }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
index 0580dc5..aa31799 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
@@ -18,8 +18,6 @@
 package org.apache.openwhisk.core.entity
 
 import spray.json.DefaultJsonProtocol
-import org.apache.openwhisk.core.entity.ControllerInstanceId.LEGAL_CHARS
-import org.apache.openwhisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
 
 /**
  * An instance id representing an invoker
@@ -31,24 +29,37 @@ import 
org.apache.openwhisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize) {
+                             val userMemory: ByteSize)
+    extends InstanceId {
   def toInt: Int = instance
 
-  override def toString: String = (Seq("invoker" + instance) ++ uniqueName ++ 
displayedName).mkString("/")
+  override val instanceType = "invoker"
+
+  override val source = s"$instanceType$instance"
+
+  override val toString: String = (Seq("invoker" + instance) ++ uniqueName ++ 
displayedName).mkString("/")
 }
 
-case class ControllerInstanceId(val asString: String) {
-  require(
-    asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
-    "Controller instance id contains invalid characters")
+case class ControllerInstanceId(asString: String) extends InstanceId {
+  validate(asString)
+  override val instanceType = "controller"
+
+  override val source = s"$instanceType$asString"
+
+  override val toString: String = source
 }
 
 object InvokerInstanceId extends DefaultJsonProtocol {
   import org.apache.openwhisk.core.entity.size.{serdes => xserds}
-  implicit val serdes = jsonFormat4(InvokerInstanceId.apply)
+  implicit val serdes = jsonFormat(InvokerInstanceId.apply, "instance", 
"uniqueName", "displayedName", "userMemory")
 }
 
 object ControllerInstanceId extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat(ControllerInstanceId.apply _, "asString")
+}
+
+trait InstanceId {
+
   // controller ids become part of a kafka topic, hence, hence allow only 
certain characters
   // see 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
   private val LEGAL_CHARS = "[a-zA-Z0-9._-]+"
@@ -56,5 +67,13 @@ object ControllerInstanceId extends DefaultJsonProtocol {
   // reserve some number of characters as the prefix to be added to topic names
   private val MAX_NAME_LENGTH = 249 - 121
 
-  implicit val serdes = jsonFormat1(ControllerInstanceId.apply)
+  def validate(asString: String): Unit =
+    require(
+      asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
+      s"$instanceType instance id contains invalid characters")
+
+  val instanceType: String
+
+  val source: String
+
 }
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index a35d999..dae6895 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -21,6 +21,7 @@ import akka.actor.Actor
 import akka.actor.ActorRef
 import akka.actor.Cancellable
 import java.time.Instant
+
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
@@ -33,17 +34,19 @@ import akka.io.Tcp.Connected
 import akka.pattern.pipe
 import pureconfig._
 import pureconfig.generic.auto._
-
 import akka.stream.ActorMaterializer
 import java.net.InetSocketAddress
 import java.net.SocketException
+
 import org.apache.openwhisk.common.MetricEmitter
 import org.apache.openwhisk.common.TransactionId.systemPrefix
+
 import scala.collection.immutable
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import org.apache.openwhisk.common.{AkkaLogging, Counter, LoggingMarkers, 
TransactionId}
 import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.{
   ActivationMessage,
   CombinedCompletionAndResultMessage,
@@ -55,8 +58,9 @@ import org.apache.openwhisk.core.database.UserContext
 import org.apache.openwhisk.core.entity.ExecManifest.ImageName
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
-import org.apache.openwhisk.core.invoker.InvokerReactive.{ActiveAck, 
LogsCollector}
+import org.apache.openwhisk.core.invoker.Invoker.LogsCollector
 import org.apache.openwhisk.http.Messages
+
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 2854f31..9901f95 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -26,8 +26,8 @@ import org.apache.openwhisk.common.Https.HttpsConfig
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.WhiskConfig._
 import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
-import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
-import org.apache.openwhisk.core.entity.{ActivationEntityLimit, 
ConcurrencyLimitConfig, ExecManifest, InvokerInstanceId}
+import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig}
+import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
@@ -37,13 +37,36 @@ import pureconfig._
 import pureconfig.generic.auto._
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.util.Try
 
 case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = 
None, displayedName: Option[String] = None)
 
 object Invoker {
 
+  /**
+   * Collect logs after the activation has finished.
+   *
+   * This method is called after an activation has finished. The logs gathered 
here are stored along the activation
+   * record in the database.
+   *
+   * @param transid transaction the activation ran in
+   * @param user the user who ran the activation
+   * @param activation the activation record
+   * @param container container used by the activation
+   * @param action action that was activated
+   * @return logs for the given activation
+   */
+  trait LogsCollector {
+    def logsToBeCollected(action: ExecutableWhiskAction): Boolean = 
action.limits.logs.asMegaBytes != 0.MB
+
+    def apply(transid: TransactionId,
+              user: Identity,
+              activation: WhiskActivation,
+              container: Container,
+              action: ExecutableWhiskAction): Future[ActivationLogs]
+  }
+
   protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")
 
   /**
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index a245c84..544ca1a 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -26,7 +26,8 @@ import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
 import org.apache.openwhisk.core.database.{UserContext, _}
@@ -45,53 +46,6 @@ import scala.util.{Failure, Success}
 
 object InvokerReactive extends InvokerProvider {
 
-  /**
-   * A method for sending Active Acknowledgements (aka "active ack") messages 
to the load balancer. These messages
-   * are either completion messages for an activation to indicate a resource 
slot is free, or result-forwarding
-   * messages for continuations (e.g., sequences and conductor actions).
-   *
-   * The activation result is always provided because some acknowledegment 
messages may not carry the result of
-   * the activation and this is needed for sending user events.
-   *
-   * @param tid the transaction id for the activation
-   * @param activationResult is the activation result
-   * @param blockingInvoke is true iff the activation was a blocking request
-   * @param controllerInstance the originating controller/loadbalancer id
-   * @param userId is the UUID for the namespace owning the activation
-   * @param acknowledegment the acknowledgement message to send
-   */
-  trait ActiveAck {
-    def apply(tid: TransactionId,
-              activationResult: WhiskActivation,
-              blockingInvoke: Boolean,
-              controllerInstance: ControllerInstanceId,
-              userId: UUID,
-              acknowledegment: AcknowledegmentMessage): Future[Any]
-  }
-
-  /**
-   * Collect logs after the activation has finished.
-   *
-   * This method is called after an activation has finished. The logs gathered 
here are stored along the activation
-   * record in the database.
-   *
-   * @param transid transaction the activation ran in
-   * @param user the user who ran the activation
-   * @param activation the activation record
-   * @param container container used by the activation
-   * @param action action that was activated
-   * @return logs for the given activation
-   */
-  trait LogsCollector {
-    def logsToBeCollected(action: ExecutableWhiskAction): Boolean = 
action.limits.logs.asMegaBytes != 0.MB
-
-    def apply(transid: TransactionId,
-              user: Identity,
-              activation: WhiskActivation,
-              container: Container,
-              action: ExecutableWhiskAction): Future[ActivationLogs]
-  }
-
   override def instance(
     config: WhiskConfig,
     instance: InvokerInstanceId,
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala
index d0135bc..949329b 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala
@@ -21,7 +21,7 @@ import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.containerpool.Container
 import org.apache.openwhisk.core.containerpool.logging.LogStore
 import org.apache.openwhisk.core.entity.{ActivationLogs, 
ExecutableWhiskAction, Identity, WhiskActivation}
-import org.apache.openwhisk.core.invoker.InvokerReactive.LogsCollector
+import org.apache.openwhisk.core.invoker.Invoker.LogsCollector
 
 import scala.concurrent.Future
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index a1e3f87..519805e 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -36,6 +36,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, 
Matchers}
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.{
   AcknowledegmentMessage,
   ActivationMessage,
@@ -51,7 +52,7 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
-import org.apache.openwhisk.core.invoker.InvokerReactive
+import org.apache.openwhisk.core.invoker.Invoker
 
 import scala.collection.mutable
 import scala.concurrent.Await
@@ -172,7 +173,7 @@ class ContainerProxyTests
     expectMsg(Transition(machine, Pausing, Paused))
   }
 
-  trait LoggedAcker extends InvokerReactive.ActiveAck {
+  trait LoggedAcker extends ActiveAck {
     def calls =
       mutable.Buffer[(TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID, AcknowledegmentMessage)]()
 
@@ -239,8 +240,7 @@ class ContainerProxyTests
       response
   }
 
-  class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () 
=> Unit)
-      extends InvokerReactive.LogsCollector {
+  class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () 
=> Unit) extends Invoker.LogsCollector {
     val collector = LoggedFunction {
       (transid: TransactionId,
        user: Identity,

Reply via email to