This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new e3c7a13 Move the ack implementations to the common package. (#4837) e3c7a13 is described below commit e3c7a13b11bb2296697ae4c0b2697eb46937702e Author: 김건희 <kimkh6...@gmail.com> AuthorDate: Thu Feb 27 12:45:00 2020 +0900 Move the ack implementations to the common package. (#4837) The ack logic that responds to kafka messages to the controller can be used by other components in the downstream. So move ack logic that is only available in invoker package to common package. --- .../scala/org/apache/openwhisk/core/ack/Ack.scala | 55 ++++++++++++++++++++++ .../openwhisk/core/ack}/MessagingActiveAck.scala | 20 ++------ .../apache/openwhisk/core/entity/InstanceId.scala | 39 +++++++++++---- .../core/containerpool/ContainerProxy.scala | 8 +++- .../apache/openwhisk/core/invoker/Invoker.scala | 29 ++++++++++-- .../openwhisk/core/invoker/InvokerReactive.scala | 50 +------------------- .../openwhisk/core/invoker/LogStoreCollector.scala | 2 +- .../containerpool/test/ContainerProxyTests.scala | 8 ++-- 8 files changed, 128 insertions(+), 83 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/Ack.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/Ack.scala new file mode 100644 index 0000000..6509602 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/Ack.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.ack +import org.apache.openwhisk.common.{TransactionId, UserEvents} +import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer} +import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation} + +import scala.concurrent.Future + +/** + * A method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages + * are either completion messages for an activation to indicate a resource slot is free, or result-forwarding + * messages for continuations (e.g., sequences and conductor actions). + * + * The activation result is always provided because some acknowledegment messages may not carry the result of + * the activation and this is needed for sending user events. + * + * @param tid the transaction id for the activation + * @param activationResult is the activation result + * @param blockingInvoke is true iff the activation was a blocking request + * @param controllerInstance the originating controller/loadbalancer id + * @param userId is the UUID for the namespace owning the activation + * @param acknowledegment the acknowledgement message to send + */ +trait ActiveAck { + def apply(tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] +} + +trait EventSender { + def send(msg: => EventMessage): Unit +} + +class UserEventSender(producer: MessageProducer) extends EventSender { + override def send(msg: => EventMessage): Unit = UserEvents.send(producer, msg) +} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/MessagingActiveAck.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala similarity index 81% rename from core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/MessagingActiveAck.scala rename to common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala index 67ac836..eb9cce9 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/MessagingActiveAck.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala @@ -15,30 +15,20 @@ * limitations under the License. */ -package org.apache.openwhisk.core.invoker +package org.apache.openwhisk.core.ack import org.apache.kafka.common.errors.RecordTooLargeException -import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents} +import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer} -import org.apache.openwhisk.core.entity.{ControllerInstanceId, InvokerInstanceId, UUID, WhiskActivation} -import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck +import org.apache.openwhisk.core.entity._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -trait EventSender { - def send(msg: => EventMessage): Unit -} - -class UserEventSender(producer: MessageProducer) extends EventSender { - override def send(msg: => EventMessage): Unit = UserEvents.send(producer, msg) -} - -class MessagingActiveAck(producer: MessageProducer, instance: InvokerInstanceId, eventSender: Option[EventSender])( +class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventSender: Option[EventSender])( implicit logging: Logging, ec: ExecutionContext) extends ActiveAck { - private val source = s"invoker${instance.instance}" override def apply(tid: TransactionId, activationResult: WhiskActivation, blockingInvoke: Boolean, @@ -58,7 +48,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: InvokerInstanceId, // UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent. if (acknowledegment.isSlotFree.nonEmpty) { eventSender.foreach { s => - EventMessage.from(activationResult, source, userId) match { + EventMessage.from(activationResult, instance.source, userId) match { case Success(msg) => s.send(msg) case Failure(t) => logging.error(this, s"activation event was not sent: $t") } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala index 0580dc5..aa31799 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala @@ -18,8 +18,6 @@ package org.apache.openwhisk.core.entity import spray.json.DefaultJsonProtocol -import org.apache.openwhisk.core.entity.ControllerInstanceId.LEGAL_CHARS -import org.apache.openwhisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH /** * An instance id representing an invoker @@ -31,24 +29,37 @@ import org.apache.openwhisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH case class InvokerInstanceId(val instance: Int, uniqueName: Option[String] = None, displayedName: Option[String] = None, - val userMemory: ByteSize) { + val userMemory: ByteSize) + extends InstanceId { def toInt: Int = instance - override def toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/") + override val instanceType = "invoker" + + override val source = s"$instanceType$instance" + + override val toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/") } -case class ControllerInstanceId(val asString: String) { - require( - asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS), - "Controller instance id contains invalid characters") +case class ControllerInstanceId(asString: String) extends InstanceId { + validate(asString) + override val instanceType = "controller" + + override val source = s"$instanceType$asString" + + override val toString: String = source } object InvokerInstanceId extends DefaultJsonProtocol { import org.apache.openwhisk.core.entity.size.{serdes => xserds} - implicit val serdes = jsonFormat4(InvokerInstanceId.apply) + implicit val serdes = jsonFormat(InvokerInstanceId.apply, "instance", "uniqueName", "displayedName", "userMemory") } object ControllerInstanceId extends DefaultJsonProtocol { + implicit val serdes = jsonFormat(ControllerInstanceId.apply _, "asString") +} + +trait InstanceId { + // controller ids become part of a kafka topic, hence, hence allow only certain characters // see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 private val LEGAL_CHARS = "[a-zA-Z0-9._-]+" @@ -56,5 +67,13 @@ object ControllerInstanceId extends DefaultJsonProtocol { // reserve some number of characters as the prefix to be added to topic names private val MAX_NAME_LENGTH = 249 - 121 - implicit val serdes = jsonFormat1(ControllerInstanceId.apply) + def validate(asString: String): Unit = + require( + asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS), + s"$instanceType instance id contains invalid characters") + + val instanceType: String + + val source: String + } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index a35d999..dae6895 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -21,6 +21,7 @@ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Cancellable import java.time.Instant + import akka.actor.Status.{Failure => FailureMessage} import akka.actor.{FSM, Props, Stash} import akka.event.Logging.InfoLevel @@ -33,17 +34,19 @@ import akka.io.Tcp.Connected import akka.pattern.pipe import pureconfig._ import pureconfig.generic.auto._ - import akka.stream.ActorMaterializer import java.net.InetSocketAddress import java.net.SocketException + import org.apache.openwhisk.common.MetricEmitter import org.apache.openwhisk.common.TransactionId.systemPrefix + import scala.collection.immutable import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId} import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector.{ ActivationMessage, CombinedCompletionAndResultMessage, @@ -55,8 +58,9 @@ import org.apache.openwhisk.core.database.UserContext import org.apache.openwhisk.core.entity.ExecManifest.ImageName import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ -import org.apache.openwhisk.core.invoker.InvokerReactive.{ActiveAck, LogsCollector} +import org.apache.openwhisk.core.invoker.Invoker.LogsCollector import org.apache.openwhisk.http.Messages + import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{Failure, Success} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 2854f31..9901f95 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -26,8 +26,8 @@ import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common._ import org.apache.openwhisk.core.WhiskConfig._ import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider} -import org.apache.openwhisk.core.containerpool.ContainerPoolConfig -import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ConcurrencyLimitConfig, ExecManifest, InvokerInstanceId} +import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig} +import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.http.{BasicHttpService, BasicRasService} @@ -37,13 +37,36 @@ import pureconfig._ import pureconfig.generic.auto._ import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Try case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None) object Invoker { + /** + * Collect logs after the activation has finished. + * + * This method is called after an activation has finished. The logs gathered here are stored along the activation + * record in the database. + * + * @param transid transaction the activation ran in + * @param user the user who ran the activation + * @param activation the activation record + * @param container container used by the activation + * @param action action that was activated + * @return logs for the given activation + */ + trait LogsCollector { + def logsToBeCollected(action: ExecutableWhiskAction): Boolean = action.limits.logs.asMegaBytes != 0.MB + + def apply(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction): Future[ActivationLogs] + } + protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol") /** diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index a245c84..544ca1a 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -26,7 +26,8 @@ import akka.event.Logging.InfoLevel import akka.stream.ActorMaterializer import org.apache.openwhisk.common._ import org.apache.openwhisk.common.tracing.WhiskTracerProvider -import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, _} +import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} +import org.apache.openwhisk.core.connector._ import org.apache.openwhisk.core.containerpool._ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.database.{UserContext, _} @@ -45,53 +46,6 @@ import scala.util.{Failure, Success} object InvokerReactive extends InvokerProvider { - /** - * A method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages - * are either completion messages for an activation to indicate a resource slot is free, or result-forwarding - * messages for continuations (e.g., sequences and conductor actions). - * - * The activation result is always provided because some acknowledegment messages may not carry the result of - * the activation and this is needed for sending user events. - * - * @param tid the transaction id for the activation - * @param activationResult is the activation result - * @param blockingInvoke is true iff the activation was a blocking request - * @param controllerInstance the originating controller/loadbalancer id - * @param userId is the UUID for the namespace owning the activation - * @param acknowledegment the acknowledgement message to send - */ - trait ActiveAck { - def apply(tid: TransactionId, - activationResult: WhiskActivation, - blockingInvoke: Boolean, - controllerInstance: ControllerInstanceId, - userId: UUID, - acknowledegment: AcknowledegmentMessage): Future[Any] - } - - /** - * Collect logs after the activation has finished. - * - * This method is called after an activation has finished. The logs gathered here are stored along the activation - * record in the database. - * - * @param transid transaction the activation ran in - * @param user the user who ran the activation - * @param activation the activation record - * @param container container used by the activation - * @param action action that was activated - * @return logs for the given activation - */ - trait LogsCollector { - def logsToBeCollected(action: ExecutableWhiskAction): Boolean = action.limits.logs.asMegaBytes != 0.MB - - def apply(transid: TransactionId, - user: Identity, - activation: WhiskActivation, - container: Container, - action: ExecutableWhiskAction): Future[ActivationLogs] - } - override def instance( config: WhiskConfig, instance: InvokerInstanceId, diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala index d0135bc..949329b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala @@ -21,7 +21,7 @@ import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.containerpool.Container import org.apache.openwhisk.core.containerpool.logging.LogStore import org.apache.openwhisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} -import org.apache.openwhisk.core.invoker.InvokerReactive.LogsCollector +import org.apache.openwhisk.core.invoker.Invoker.LogsCollector import scala.concurrent.Future diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index a1e3f87..519805e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -36,6 +36,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector.{ AcknowledegmentMessage, ActivationMessage, @@ -51,7 +52,7 @@ import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.http.Messages import org.apache.openwhisk.core.database.UserContext -import org.apache.openwhisk.core.invoker.InvokerReactive +import org.apache.openwhisk.core.invoker.Invoker import scala.collection.mutable import scala.concurrent.Await @@ -172,7 +173,7 @@ class ContainerProxyTests expectMsg(Transition(machine, Pausing, Paused)) } - trait LoggedAcker extends InvokerReactive.ActiveAck { + trait LoggedAcker extends ActiveAck { def calls = mutable.Buffer[(TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, AcknowledegmentMessage)]() @@ -239,8 +240,7 @@ class ContainerProxyTests response } - class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () => Unit) - extends InvokerReactive.LogsCollector { + class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () => Unit) extends Invoker.LogsCollector { val collector = LoggedFunction { (transid: TransactionId, user: Identity,