This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new a47cd42 Emit user-faced metrics and events to Kafka. (#3552) a47cd42 is described below commit a47cd423ffcf695496465383750f0a0b9db29b1f Author: Vadim Raskin <raskinva...@gmail.com> AuthorDate: Mon Apr 30 13:09:58 2018 +0200 Emit user-faced metrics and events to Kafka. (#3552) This introduces a new kafka topic called events, which will accept events of 2 types: Activations and Metrics. First corresponds to the metadata which is collected after an activation is finished (initTime, waitTime, responseCode, kind, etc), second stands for user related metrics of any kind (throttledActivations, concurrentActivations). The data is not aggregated, it is sent to Kafka instantly. --- ansible/group_vars/all | 1 + ansible/roles/controller/tasks/deploy.yml | 1 + ansible/roles/invoker/tasks/deploy.yml | 1 + common/scala/src/main/resources/application.conf | 12 ++- .../src/main/scala/whisk/common/UserEvents.scala | 35 ++++++++ .../src/main/scala/whisk/core/WhiskConfig.scala | 1 + .../main/scala/whisk/core/connector/Message.scala | 90 +++++++++++++++++++-- .../scala/whisk/core/controller/Controller.scala | 7 +- .../core/entitlement/ActivationThrottler.scala | 3 + .../scala/whisk/core/entitlement/Entitlement.scala | 41 ++++++++-- .../whisk/core/entitlement/LocalEntitlement.scala | 13 ++- .../whisk/core/containerpool/ContainerProxy.scala | 8 +- .../scala/whisk/core/invoker/InvokerReactive.scala | 35 ++++++-- docs/metrics.md | 47 ++++++++++- tests/src/test/resources/application.conf.j2 | 3 + .../test/scala/whisk/common/UserEventTests.scala | 94 ++++++++++++++++++++++ .../containerpool/test/ContainerProxyTests.scala | 44 ++++++---- .../controller/test/ControllerTestCommon.scala | 5 +- .../core/controller/test/WebActionsApiTests.scala | 2 +- 19 files changed, 392 insertions(+), 51 deletions(-) diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 0bff876..f04807b 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -324,3 +324,4 @@ metrics: host: "{{ metrics_kamon_statsd_host | default('') }}" port: "{{ metrics_kamon_statsd_port | default('8125') }}" +user_events: "{{ user_events_enabled | default(false) }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 3f2b609..8173f00 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -151,6 +151,7 @@ "CONFIG_whisk_db_actionsDdoc": "{{ db_whisk_actions_ddoc | default() }}" "CONFIG_whisk_db_activationsDdoc": "{{ db_whisk_activations_ddoc | default() }}" "CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}" + "CONFIG_whisk_userEvents_enabled": "{{ user_events }}" "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}" "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}" diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 5841269..2643b54 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -175,6 +175,7 @@ -e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ kafka.ssl.keystore.password }}' -e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ kafka.ssl.keystore.name }}' -e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ kafka.ssl.keystore.password }}' + -e CONFIG_whisk_userEvents_enabled='{{ user_events }}' -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}' -e CONFIG_whisk_couchdb_protocol='{{ db_protocol }}' -e CONFIG_whisk_couchdb_host='{{ db_host }}' diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index c12eb02..f665edb 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -50,7 +50,8 @@ kamon { } whisk { - # kafka related configuration + # kafka related configuration, the complete list of parameters is here: + # https://kafka.apache.org/documentation/#brokerconfigs kafka { replication-factor = 1 @@ -98,6 +99,11 @@ whisk { retention-ms = 172800000 max-message-bytes = ${whisk.activation.payload.max} } + events { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 3600000 + } } } # db related configuration @@ -136,6 +142,10 @@ whisk { local-image-prefix = "whisk" } + user-events { + enabled = false + } + activation { payload { max = 1048576 // 5 m not possible because cross-referenced to kafka configurations diff --git a/common/scala/src/main/scala/whisk/common/UserEvents.scala b/common/scala/src/main/scala/whisk/common/UserEvents.scala new file mode 100644 index 0000000..0bff020 --- /dev/null +++ b/common/scala/src/main/scala/whisk/common/UserEvents.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.common + +import pureconfig.loadConfigOrThrow +import whisk.core.ConfigKeys +import whisk.core.connector.{EventMessage, MessageProducer} + +object UserEvents { + + case class UserEventsConfig(enabled: Boolean) + + val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled + + def send(producer: MessageProducer, em: => EventMessage) = { + if (enabled) { + producer.send("events", em) + } + } +} diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 1fd0154..231341f 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -220,6 +220,7 @@ object ConfigKeys { val logLimit = "whisk.log-limit" val activation = "whisk.activation" val activationPayload = s"$activation.payload" + val userEvents = "whisk.user-events" val runtimes = "whisk.runtimes" diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index 13e0ed6..b17de8f 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -18,15 +18,9 @@ package whisk.core.connector import scala.util.Try - import spray.json._ import whisk.common.TransactionId -import whisk.core.entity.ActivationId -import whisk.core.entity.DocRevision -import whisk.core.entity.FullyQualifiedEntityName -import whisk.core.entity.Identity -import whisk.core.entity.InstanceId -import whisk.core.entity.WhiskActivation +import whisk.core.entity._ /** Basic trait for messages that are sent on a message bus connector. */ trait Message { @@ -122,3 +116,85 @@ object PingMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) implicit val serdes = jsonFormat(PingMessage.apply _, "name") } + +trait EventMessageBody extends Message { + def typeName: String +} + +object EventMessageBody extends DefaultJsonProtocol { + + implicit def format = new JsonFormat[EventMessageBody] { + def write(eventMessageBody: EventMessageBody) = eventMessageBody match { + case m: Metric => m.toJson + case a: Activation => a.toJson + } + + def read(value: JsValue) = + if (value.asJsObject.fields.contains("metricName")) { + value.convertTo[Metric] + } else { + value.convertTo[Activation] + } + } +} + +case class Activation(name: String, + statusCode: Int, + duration: Long, + waitTime: Long, + initTime: Long, + kind: String, + conductor: Boolean, + memory: Int, + causedBy: Boolean) + extends EventMessageBody { + val typeName = "Activation" + override def serialize = toJson.compactPrint + + def toJson = Activation.activationFormat.write(this) +} + +object Activation extends DefaultJsonProtocol { + def parse(msg: String) = Try(activationFormat.read(msg.parseJson)) + implicit val activationFormat = + jsonFormat( + Activation.apply _, + "name", + "statusCode", + "duration", + "waitTime", + "initTime", + "kind", + "conductor", + "memory", + "causedBy") +} + +case class Metric(metricName: String, metricValue: Long) extends EventMessageBody { + val typeName = "Metric" + override def serialize = toJson.compactPrint + def toJson = Metric.metricFormat.write(this).asJsObject +} + +object Metric extends DefaultJsonProtocol { + def parse(msg: String) = Try(metricFormat.read(msg.parseJson)) + implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", "metricValue") +} + +case class EventMessage(source: String, + body: EventMessageBody, + subject: Subject, + namespace: String, + userId: UUID, + eventType: String, + timestamp: Long = System.currentTimeMillis()) + extends Message { + override def serialize = EventMessage.format.write(this).compactPrint +} + +object EventMessage extends DefaultJsonProtocol { + implicit val format = + jsonFormat(EventMessage.apply _, "source", "body", "subject", "namespace", "userId", "eventType", "timestamp") + + def parse(msg: String) = format.read(msg.parseJson) +} diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index 095a8b6..340df96 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -121,7 +121,8 @@ class Controller(val instance: InstanceId, SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance) logging.info(this, s"loadbalancer initialized: ${loadBalancer.getClass.getSimpleName}")(TransactionId.controller) - private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer) + private implicit val entitlementProvider = + new LocalEntitlementProvider(whiskConfig, loadBalancer, instance) private implicit val activationIdFactory = new ActivationIdGenerator {} private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem) @@ -227,6 +228,10 @@ object Controller { abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation") } + if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = "events")) { + abort(s"failure during msgProvider.ensureTopic for topic events") + } + ExecManifest.initialize(config) match { case Success(_) => val controller = new Controller( diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala index 563e7fe..b99385a 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala @@ -69,14 +69,17 @@ class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity sealed trait RateLimit { def ok: Boolean def errorMsg: String + def limitName: String } case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit { val ok: Boolean = count < allowed // must have slack for the current activation request override def errorMsg: String = Messages.tooManyConcurrentRequests(count, allowed) + val limitName: String = "ConcurrentRateLimit" } case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit { val ok: Boolean = count <= allowed // the count is already updated to account for the current request override def errorMsg: String = Messages.tooManyRequests(count, allowed) + val limitName: String = "TimedRateLimit" } diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala index 1aa6e3c..478bda4 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala @@ -27,9 +27,10 @@ import akka.http.scaladsl.model.StatusCodes.Forbidden import akka.http.scaladsl.model.StatusCodes.TooManyRequests import whisk.core.entitlement.Privilege.ACTIVATE import whisk.core.entitlement.Privilege.REJECT -import whisk.common.Logging -import whisk.common.TransactionId +import whisk.common.{Logging, TransactionId, UserEvents} +import whisk.connector.kafka.KafkaMessagingProvider import whisk.core.WhiskConfig +import whisk.core.connector.{EventMessage, Metric} import whisk.core.controller.RejectRequest import whisk.core.entity._ import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer} @@ -74,9 +75,10 @@ protected[core] object EntitlementProvider { * A trait that implements entitlements to resources. It performs checks for CRUD and Acivation requests. * This is where enforcement of activation quotas takes place, in additional to basic authorization. */ -protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer)( - implicit actorSystem: ActorSystem, - logging: Logging) { +protected[core] abstract class EntitlementProvider( + config: WhiskConfig, + loadBalancer: LoadBalancer, + controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging) { private implicit val executionContext: ExecutionContext = actorSystem.dispatcher @@ -142,6 +144,8 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations), config.actionInvokeSystemOverloadLimit.toInt) + private val eventProducer = KafkaMessagingProvider.getProducer(this.config) + /** * Grants a subject the right to access a resources. * @@ -358,10 +362,37 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala private def checkThrottleOverload(throttle: Future[RateLimit], user: Identity)( implicit transid: TransactionId): Future[Unit] = { throttle.flatMap { limit => + val userId = user.authkey.uuid if (limit.ok) { + limit match { + case c: ConcurrentRateLimit => { + val metric = + Metric("ConcurrentInvocations", c.count + 1) + UserEvents.send( + eventProducer, + EventMessage( + s"controller${controllerInstance.instance}", + metric, + user.subject, + user.namespace.toString, + userId, + metric.typeName)) + } + case _ => // ignore + } Future.successful(()) } else { logging.info(this, s"'${user.namespace}' has exceeded its throttle limit, ${limit.errorMsg}") + val metric = Metric(limit.limitName, 1) + UserEvents.send( + eventProducer, + EventMessage( + s"controller${controllerInstance.instance}", + metric, + user.subject, + user.namespace.toString, + userId, + metric.typeName)) Future.failed(RejectRequest(TooManyRequests, limit.errorMsg)) } } diff --git a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala index e9155f7..fa0edf7 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala @@ -19,13 +19,11 @@ package whisk.core.entitlement import scala.collection.concurrent.TrieMap import scala.concurrent.Future - import akka.actor.ActorSystem - import whisk.common.Logging import whisk.common.TransactionId import whisk.core.WhiskConfig -import whisk.core.entity.Subject +import whisk.core.entity.{InstanceId, Subject} import whisk.core.loadBalancer.LoadBalancer private object LocalEntitlementProvider { @@ -34,10 +32,11 @@ private object LocalEntitlementProvider { private val matrix = TrieMap[(Subject, String), Set[Privilege]]() } -protected[core] class LocalEntitlementProvider(private val config: WhiskConfig, private val loadBalancer: LoadBalancer)( - implicit actorSystem: ActorSystem, - logging: Logging) - extends EntitlementProvider(config, loadBalancer) { +protected[core] class LocalEntitlementProvider( + private val config: WhiskConfig, + private val loadBalancer: LoadBalancer, + private val controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: Logging) + extends EntitlementProvider(config, loadBalancer, controllerInstance) { private implicit val executionContext = actorSystem.dispatcher diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index fddafd5..b75ad72 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -95,7 +95,7 @@ case object RescheduleJob // job is sent back to parent and could not be process */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], + sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any], storeActivation: (TransactionId, WhiskActivation) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InstanceId, @@ -161,7 +161,7 @@ class ContainerProxy( // implicitly via a FailureMessage which will be processed later when the state // transitions to Running val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, response) - sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex) + sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.authkey.uuid) storeActivation(transid, activation) } .flatMap { container => @@ -380,7 +380,7 @@ class ContainerProxy( } // Sending active ack. Entirely asynchronous and not waited upon. - activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex)) + activation.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.authkey.uuid)) // Adds logs to the raw activation. val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation @@ -422,7 +422,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus object ContainerProxy { def props( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], + ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => Future[Any], store: (TransactionId, WhiskActivation) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InstanceId, diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 8601e4c..b132dd8 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -26,7 +26,7 @@ import akka.stream.ActorMaterializer import org.apache.kafka.common.errors.RecordTooLargeException import pureconfig._ import spray.json._ -import whisk.common.{Logging, LoggingMarkers, Scheduler, TransactionId} +import whisk.common._ import whisk.core.{ConfigKeys, WhiskConfig} import whisk.core.connector._ import whisk.core.containerpool._ @@ -40,6 +40,7 @@ import whisk.spi.SpiLoader import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success} +import DefaultJsonProtocol._ class InvokerReactive( config: WhiskConfig, @@ -112,12 +113,12 @@ class InvokerReactive( private val ack = (tid: TransactionId, activationResult: WhiskActivation, blockingInvoke: Boolean, - controllerInstance: InstanceId) => { + controllerInstance: InstanceId, + userId: UUID) => { implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { val msg = CompletionMessage(transid, res, instance) - producer.send(s"completed${controllerInstance.toInt}", msg).andThen { case Success(_) => logging.info( @@ -125,6 +126,30 @@ class InvokerReactive( s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}") } } + // Potentially sends activation metadata to kafka if user events are enabled + UserEvents.send( + producer, { + val activation = Activation( + activationResult.namespace + EntityPath.PATHSEP + activationResult.name, + activationResult.response.statusCode, + activationResult.duration.getOrElse(0), + activationResult.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0), + activationResult.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0), + activationResult.annotations.getAs[String](WhiskActivation.kindAnnotation).getOrElse("unknown_kind"), + activationResult.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false), + activationResult.annotations + .getAs[ActionLimits](WhiskActivation.limitsAnnotation) + .map(al => al.memory.megabytes) + .getOrElse(0), + activationResult.annotations.getAs[Boolean](WhiskActivation.causedByAnnotation).getOrElse(false)) + EventMessage( + s"invoker${instance.instance}", + activation, + activationResult.subject, + activationResult.namespace.toString, + userId, + activation.typeName) + }) send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith { case t if t.getCause.isInstanceOf[RecordTooLargeException] => @@ -209,7 +234,7 @@ class InvokerReactive( val activation = generateFallbackActivation(msg, response) activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex) + ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.authkey.uuid) store(msg.transid, activation) Future.successful(()) } @@ -219,7 +244,7 @@ class InvokerReactive( activationFeed ! MessageFeed.Processed val activation = generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack(msg.transid, activation, false, msg.rootControllerIndex) + ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.authkey.uuid) logging.warn(this, s"namespace ${msg.user.namespace} was blocked in invoker.") Future.successful(()) } diff --git a/docs/metrics.md b/docs/metrics.md index 9ba60ef..3d2bb5e 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -17,9 +17,14 @@ --> # Openwhisk Metric Support -Openwhick contains the capability to send metric information to a statsd server. This capability is disabled per default. Instead metric information is normally written to the log files in logmarker format. +Openwhisk distinguishes between system and user metrics (events). -## Configuration +System metrics typically contain information about system performance and provide a possibility to send them to Kamon or write them to log files in logmarker format. This metrics are typically used by OpenWhisk providers/operators. + +User metrics encompass information about action performance which is sent to Kafka in a form of events. These metrics are to be consumed by OpenWhisk users, however they could be also used for billing or audit purpose. It is to be noted that at the moment the events are not directly exposed to the users and require an additional Kakfa Consumer based micro-service for data processing. + +## System specific metrics +### Configuration Both capabilties can be enabled or disabled separately during deployment via Ansible configuration in the 'goup_vars/all' file of an environment. @@ -60,7 +65,7 @@ metrics_kamon_statsd_port: '8125' metrics_log: true ``` -## Testing the statsd metric support +### Testing the statsd metric support The Kamon project privides an integrated docker image containing statsd and a connected Grafana dashboard via [this Github project](https://github.com/kamon-io/docker-grafana-graphite). This image is helpful for testing the metrices sent via statsd. @@ -69,3 +74,39 @@ Please follow these [instructions](https://github.com/kamon-io/docker-grafana-gr The docker image exposes statsd via the (standard) port 8125 and a Graphana dashboard via port 8080 on your docker host. The address of your docker host has to be configured in the `metrics_kamon_statsd_host` configuration property. + +## User specific metrics +### Configuration +User metrics are enabled by default and could be explicitly disabled by setting the following property in one of the Ansible configuration files: +``` +user_events: false +``` + +### Supported events +Activation is an event that occurs after after each activation. It includes the following execution metadata: +``` +waitTime - internal system hold time +initTime - time it took to initialise an action, e.g. docker init +statusCode - status code of the invocation: 0 - success, 1 - application error, 2 - action developer error, 3 - internal OpenWhisk error +duration - actual time the action code was running +kind - action flavor, e.g. nodejs +conductor - true for conductor backed actions +memory - maximum memory allowed for action container +causedBy - true for sequence actions +``` +Metric is any user specific event produced by the system and it at this moment includes the following information: +``` +ConcurrentRateLimit - a user has exceeded its limit for concurrent invocations. +TimedRateLimit - the user has reached its per minute limit for the number of invocations. +ConcurrentInvocations - the number of in flight invocations per user. +``` + +Example events that could be consumed from Kafka. +Activation: +``` +{"body":{"statusCode":0,"duration":3,"name":"whisk.system/invokerHealthTestAction0","waitTime":583915671,"conductor":false,"kind":"nodejs:6","initTime":0,"memory": 256, "causedBy": false},"eventType":"Activation","source":"invoker0","subject":"whisk.system","timestamp":1524476122676,"userId":"d0888ad5-5a92-435e-888a-d55a92935e54","namespace":"whisk.system"} +``` +Metric: +``` +{"body":{"metricName":"ConcurrentInvocations","metricValue":1},"eventType":"Metric","source":"controller0","subject":"guest","timestamp":1524476104419,"userId":"23bc46b1-71f6-4ed5-8c54-816aa4f8c502","namespace":"guest"} +``` diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index 6054bda..62fa019 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -58,4 +58,7 @@ whisk { client-auth = "{{ controller.ssl.clientAuth }}" } } + user-events { + enabled = {{ user_events }} + } } diff --git a/tests/src/test/scala/whisk/common/UserEventTests.scala b/tests/src/test/scala/whisk/common/UserEventTests.scala new file mode 100644 index 0000000..661b07e --- /dev/null +++ b/tests/src/test/scala/whisk/common/UserEventTests.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.common + +import java.nio.charset.StandardCharsets + +import akka.actor.ActorSystem +import common._ +import common.rest.WskRest +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import whisk.connector.kafka.KafkaConsumerConnector +import whisk.core.WhiskConfig +import whisk.core.connector.{Activation, EventMessage, Metric} + +import scala.concurrent.duration._ + +@RunWith(classOf[JUnitRunner]) +class UserEventTests extends FlatSpec with Matchers with WskTestHelpers with StreamLogging with BeforeAndAfterAll { + + implicit val wskprops = WskProps() + implicit val system = ActorSystem("UserEventTestSystem") + val config = new WhiskConfig(WhiskConfig.kafkaHosts) + + val wsk = new WskRest + + val groupid = "kafkatest" + val topic = "events" + val maxPollInterval = 10.seconds + + val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic) + val testActionsDir = WhiskProperties.getFileRelativeToWhiskHome("tests/dat/actions") + behavior of "UserEvents" + + override def afterAll() { + consumer.close() + } + + if (UserEvents.enabled) { + it should "invoke an action and produce user events" in withAssetCleaner(wskprops) { (wp, assetHelper) => + val file = Some(TestUtils.getTestActionFilename("hello.js")) + val name = "testUserEvents" + + assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { (action, _) => + action.create(name, file) + } + + val run = wsk.action.invoke(name, blocking = true) + + withActivation(wsk.activation, run) { result => + withClue("invoking an action was unsuccessful") { + result.response.status shouldBe "success" + } + } + // checking for any metrics to arrive + val received = + consumer.peek(maxPollInterval).map { + case (_, _, _, msg) => EventMessage.parse(new String(msg, StandardCharsets.UTF_8)) + } + received.map(event => { + event.body match { + case a: Activation => + Seq(a.statusCode) should contain oneOf (0, 1, 2, 3) + event.source should fullyMatch regex "invoker\\d+".r + case m: Metric => + Seq(m.metricName) should contain oneOf ("ConcurrentInvocations", "ConcurrentRateLimit", "TimedRateLimit") + event.source should fullyMatch regex "controller\\d+".r + } + }) + // produce at least 2 events - an Activation and a 'ConcurrentInvocations' Metric + // >= 2 is due to events that might have potentially occurred in between + received.size should be >= 2 + consumer.commit() + } + + } + +} diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 0055372..fcde7e7 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -150,7 +150,7 @@ class ContainerProxyTests /** Creates an inspectable version of the ack method, which records all calls in a buffer */ def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction { - (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) => + (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId, _: UUID) => activation.annotations.get("limits") shouldBe Some(a.limits.toJson) activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson) @@ -219,7 +219,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) preWarm(machine) @@ -255,7 +256,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) preWarm(machine) @@ -302,7 +304,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) preWarm(machine) @@ -340,7 +343,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) @@ -372,7 +376,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(noLogsAction, message) @@ -402,7 +407,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -437,7 +443,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -476,7 +483,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -506,7 +514,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -535,7 +544,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -568,7 +578,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) // first run an activation timeout(machine) // times out Ready state so container suspends @@ -603,7 +614,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) timeout(machine) // times out Ready state so container suspends @@ -639,7 +651,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) // Start running the action @@ -690,7 +703,8 @@ class ContainerProxyTests val machine = childActorOf( - ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) + ContainerProxy + .props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) timeout(machine) diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index 662c6d7..e9df1d5 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -67,7 +67,7 @@ protected trait ControllerTestCommon override implicit val actorSystem = system // defined in ScalatestRouteTest override val executionContext = actorSystem.dispatcher - override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties) + override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties ++ WhiskConfig.kafkaHosts) assert(whiskConfig.isValid) // initialize runtimes manifest @@ -75,7 +75,8 @@ protected trait ControllerTestCommon override val loadBalancer = new DegenerateLoadBalancerService(whiskConfig) - override lazy val entitlementProvider: EntitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer) + override lazy val entitlementProvider: EntitlementProvider = + new LocalEntitlementProvider(whiskConfig, loadBalancer, instance) override val activationIdFactory = new ActivationId.ActivationIdGenerator() { // need a static activation id to test activations api diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala index 59d4b8e..97d5999 100644 --- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala @@ -1729,7 +1729,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac } class TestingEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer) - extends EntitlementProvider(config, loadBalancer) { + extends EntitlementProvider(config, loadBalancer, InstanceId(0)) { protected[core] override def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = { val subject = user.subject -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.