This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new c674757 Allow namespace ignore in user-events (#4668) c674757 is described below commit c674757c3e774368873103d40dc21eff1ae051f1 Author: Cosmin Stanciu <sel...@users.noreply.github.com> AuthorDate: Tue Nov 5 22:44:38 2019 -0800 Allow namespace ignore in user-events (#4668) Enable support for ignoring action level metrics for certain namespaces which are used for test purposes Fixes #4667 --- core/monitoring/user-events/README.md | 9 +++ .../user-events/src/main/resources/reference.conf | 3 + .../core/monitoring/metrics/EventConsumer.scala | 11 ++-- .../core/monitoring/metrics/KamonRecorder.scala | 28 ++++++-- .../core/monitoring/metrics/OpenWhiskEvents.scala | 4 +- .../monitoring/metrics/PrometheusRecorder.scala | 30 ++++++--- .../resources/application.conf} | 18 ++--- .../core/monitoring/metrics/EventsTestHelper.scala | 5 +- .../monitoring/metrics/KamonRecorderTests.scala | 73 ++++++++++++-------- .../metrics/PrometheusRecorderTests.scala | 77 ++++++++++++++-------- 10 files changed, 172 insertions(+), 86 deletions(-) diff --git a/core/monitoring/user-events/README.md b/core/monitoring/user-events/README.md index 5ed2127..ff6730a 100644 --- a/core/monitoring/user-events/README.md +++ b/core/monitoring/user-events/README.md @@ -36,6 +36,15 @@ This service connects to `events` topic and publishes the events to various serv The service needs the following env variables to be set - `KAFKA_HOSTS` - For local env it can be set to `172.17.0.1:9093`. When using [OpenWhisk Devtools][2] based setup use `kafka` +- Namespaces can be removed from reports by listing them inside the `reference.conf` using the `whisk.user-events.ignored-namespaces` configuration. +e.g: +``` +whisk { + user-events { + ignored-namespaces = ["canary","testing"] + } +} +``` Integrations ------------ diff --git a/core/monitoring/user-events/src/main/resources/reference.conf b/core/monitoring/user-events/src/main/resources/reference.conf index 6f7d1c2..6282614 100644 --- a/core/monitoring/user-events/src/main/resources/reference.conf +++ b/core/monitoring/user-events/src/main/resources/reference.conf @@ -23,5 +23,8 @@ whisk { # Enables KamonRecorder so as to enable sending metrics to Kamon supported backends # like DataDog enable-kamon = false + + # Namespaces that should not be monitored + ignored-namespaces = [] } } diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala index 65f245e..a6eca9b 100644 --- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala +++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala @@ -39,15 +39,16 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import org.apache.openwhisk.core.connector.{Activation, EventMessage, Metric} import org.apache.openwhisk.core.entity.ActivationResponse +import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig trait MetricRecorder { - def processActivation(activation: Activation, initiatorNamespace: String): Unit + def processActivation(activation: Activation, initiatorNamespace: String, metricConfig: MetricConfig): Unit def processMetric(metric: Metric, initiatorNamespace: String): Unit } -case class EventConsumer(settings: ConsumerSettings[String, String], recorders: Seq[MetricRecorder])( - implicit system: ActorSystem, - materializer: ActorMaterializer) +case class EventConsumer(settings: ConsumerSettings[String, String], + recorders: Seq[MetricRecorder], + metricConfig: MetricConfig)(implicit system: ActorSystem, materializer: ActorMaterializer) extends KafkaMetricsProvider { import EventConsumer._ @@ -110,7 +111,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String], recorders: .foreach { e => e.body match { case a: Activation => - recorders.foreach(_.processActivation(a, e.namespace)) + recorders.foreach(_.processActivation(a, e.namespace, metricConfig)) updateGlobalMetrics(a) case m: Metric => recorders.foreach(_.processMetric(m, e.namespace)) diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala index 34af567..c2e785b 100644 --- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala +++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala @@ -21,10 +21,12 @@ import akka.event.slf4j.SLF4JLogging import org.apache.openwhisk.core.connector.{Activation, Metric} import kamon.Kamon import kamon.metric.MeasurementUnit +import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig import scala.collection.concurrent.TrieMap trait KamonMetricNames extends MetricNames { + val namespaceActivationMetric = "openwhisk.namespace.activations" val activationMetric = "openwhisk.action.activations" val coldStartMetric = "openwhisk.action.coldStarts" val waitTimeMetric = "openwhisk.action.waitTime" @@ -41,23 +43,23 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg private val activationMetrics = new TrieMap[String, ActivationKamonMetrics] private val limitMetrics = new TrieMap[String, LimitKamonMetrics] - override def processActivation(activation: Activation, initiatorNamespace: String): Unit = { - lookup(activation, initiatorNamespace).record(activation) + override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = { + lookup(activation, initiator).record(activation, metricConfig) } - override def processMetric(metric: Metric, initiatorNamespace: String): Unit = { - val limitMetric = limitMetrics.getOrElseUpdate(initiatorNamespace, LimitKamonMetrics(initiatorNamespace)) + override def processMetric(metric: Metric, initiator: String): Unit = { + val limitMetric = limitMetrics.getOrElseUpdate(initiator, LimitKamonMetrics(initiator)) limitMetric.record(metric) } - def lookup(activation: Activation, initiatorNamespace: String): ActivationKamonMetrics = { + def lookup(activation: Activation, initiator: String): ActivationKamonMetrics = { val name = activation.name val kind = activation.kind val memory = activation.memory.toString val namespace = activation.namespace val action = activation.action activationMetrics.getOrElseUpdate(name, { - ActivationKamonMetrics(namespace, action, kind, memory, initiatorNamespace) + ActivationKamonMetrics(namespace, action, kind, memory, initiator) }) } @@ -87,8 +89,11 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg `actionName` -> action, `actionKind` -> kind, `actionMemory` -> memory) + private val namespaceActivationsTags = + Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator) private val tags = Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator, `actionName` -> action) + private val namespaceActivations = Kamon.counter(namespaceActivationMetric).refine(namespaceActivationsTags) private val activations = Kamon.counter(activationMetric).refine(activationTags) private val coldStarts = Kamon.counter(coldStartMetric).refine(tags) private val waitTime = Kamon.histogram(waitTimeMetric, MeasurementUnit.time.milliseconds).refine(tags) @@ -96,7 +101,16 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg private val duration = Kamon.histogram(durationMetric, MeasurementUnit.time.milliseconds).refine(tags) private val responseSize = Kamon.histogram(responseSizeMetric, MeasurementUnit.information.bytes).refine(tags) - def record(a: Activation): Unit = { + def record(a: Activation, metricConfig: MetricConfig): Unit = { + namespaceActivations.increment() + + // only record activation if not executed in an ignored namespace + if (!metricConfig.ignoredNamespaces.contains(a.namespace)) { + recordActivation(a) + } + } + + def recordActivation(a: Activation): Unit = { activations.increment() if (a.isColdStart) { diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala index f5c7ce6..49ddb2c 100644 --- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala +++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala @@ -33,7 +33,7 @@ import scala.concurrent.{ExecutionContext, Future} object OpenWhiskEvents extends SLF4JLogging { - case class MetricConfig(port: Int, enableKamon: Boolean) + case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String]) def start(config: Config)(implicit system: ActorSystem, materializer: ActorMaterializer): Future[Http.ServerBinding] = { @@ -47,7 +47,7 @@ object OpenWhiskEvents extends SLF4JLogging { val prometheusRecorder = PrometheusRecorder(prometheusReporter) val recorders = if (metricConfig.enableKamon) Seq(prometheusRecorder, KamonRecorder) else Seq(prometheusRecorder) - val eventConsumer = EventConsumer(eventConsumerSettings(defaultConsumerConfig(config)), recorders) + val eventConsumer = EventConsumer(eventConsumerSettings(defaultConsumerConfig(config)), recorders, metricConfig) CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "shutdownConsumer") { () => eventConsumer.shutdown() diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala index 516a91d..9cf7a22 100644 --- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala +++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala @@ -29,6 +29,7 @@ import org.apache.openwhisk.core.connector.{Activation, Metric} import io.prometheus.client.exporter.common.TextFormat import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram} import kamon.prometheus.PrometheusReporter +import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse} import scala.collection.JavaConverters._ @@ -36,6 +37,7 @@ import scala.collection.concurrent.TrieMap import scala.concurrent.duration.Duration trait PrometheusMetricNames extends MetricNames { + val namespaceMetric = "openwhisk_namespace_activations_total" val activationMetric = "openwhisk_action_activations_total" val coldStartMetric = "openwhisk_action_coldStarts_total" val waitTimeMetric = "openwhisk_action_waitTime_seconds" @@ -57,19 +59,19 @@ case class PrometheusRecorder(kamon: PrometheusReporter) private val activationMetrics = new TrieMap[String, ActivationPromMetrics] private val limitMetrics = new TrieMap[String, LimitPromMetrics] - override def processActivation(activation: Activation, initiatorNamespace: String): Unit = { - lookup(activation, initiatorNamespace).record(activation) + override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = { + lookup(activation, initiator).record(activation, initiator, metricConfig) } - override def processMetric(metric: Metric, initiatorNamespace: String): Unit = { - val limitMetric = limitMetrics.getOrElseUpdate(initiatorNamespace, LimitPromMetrics(initiatorNamespace)) + override def processMetric(metric: Metric, initiator: String): Unit = { + val limitMetric = limitMetrics.getOrElseUpdate(initiator, LimitPromMetrics(initiator)) limitMetric.record(metric) } override def getReport(): MessageEntity = HttpEntity(PrometheusExporter.textV4, createSource()) - private def lookup(activation: Activation, initiatorNamespace: String): ActivationPromMetrics = { + private def lookup(activation: Activation, initiator: String): ActivationPromMetrics = { //TODO Unregister unused actions val name = activation.name val kind = activation.kind @@ -77,7 +79,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter) val namespace = activation.namespace val action = activation.action activationMetrics.getOrElseUpdate(name, { - ActivationPromMetrics(namespace, action, kind, memory, initiatorNamespace) + ActivationPromMetrics(namespace, action, kind, memory, initiator) }) } @@ -100,6 +102,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter) kind: String, memory: String, initiatorNamespace: String) { + private val namespaceActivations = namespaceActivationCounter.labels(namespace, initiatorNamespace) private val activations = activationCounter.labels(namespace, initiatorNamespace, action, kind, memory) private val coldStarts = coldStartCounter.labels(namespace, initiatorNamespace, action) private val waitTime = waitTimeHisto.labels(namespace, initiatorNamespace, action) @@ -118,7 +121,16 @@ case class PrometheusRecorder(kamon: PrometheusReporter) private val statusInternalError = statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError) - def record(a: Activation): Unit = { + def record(a: Activation, initiator: String, metricConfig: MetricConfig): Unit = { + namespaceActivations.inc() + + // only record activation if not executed in an ignored namespace + if (!metricConfig.ignoredNamespaces.contains(a.namespace)) { + recordActivation(a, initiator) + } + } + + def recordActivation(a: Activation, initiator: String): Unit = { gauge.set(a.memory) activations.inc() @@ -137,7 +149,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter) case ActivationResponse.statusApplicationError => statusApplicationError.inc() case ActivationResponse.statusDeveloperError => statusDeveloperError.inc() case ActivationResponse.statusWhiskError => statusInternalError.inc() - case x => statusCounter.labels(namespace, initiatorNamespace, action, x).inc() + case x => statusCounter.labels(namespace, initiator, action, x).inc() } a.size.foreach(responseSize.observe(_)) @@ -177,6 +189,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter) } object PrometheusRecorder extends PrometheusMetricNames { + private val namespaceActivationCounter = + counter(namespaceMetric, "Namespace activations Count", actionNamespace, initiatorNamespace) private val activationCounter = counter( activationMetric, diff --git a/core/monitoring/user-events/src/main/resources/reference.conf b/core/monitoring/user-events/src/test/resources/application.conf similarity index 76% copy from core/monitoring/user-events/src/main/resources/reference.conf copy to core/monitoring/user-events/src/test/resources/application.conf index 6f7d1c2..f7413dc 100644 --- a/core/monitoring/user-events/src/main/resources/reference.conf +++ b/core/monitoring/user-events/src/test/resources/application.conf @@ -15,13 +15,15 @@ # limitations under the License. # -whisk { - user-events { - # Server port - port = 9095 +user-events { + # Server port + port = 9095 + + # Enables KamonRecorder so as to enable sending metrics to Kamon supported backends + # like DataDog + enable-kamon = false + + # Namespaces that should not be monitored + ignored-namespaces = ["guest"] - # Enables KamonRecorder so as to enable sending metrics to Kamon supported backends - # like DataDog - enable-kamon = false - } } diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala index 71b8d2e..6f2edae 100644 --- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala +++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala @@ -23,6 +23,8 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import com.typesafe.config.Config import kamon.prometheus.PrometheusReporter +import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig +import pureconfig.loadConfigOrThrow trait EventsTestHelper { @@ -34,7 +36,8 @@ trait EventsTestHelper { val settings = OpenWhiskEvents .eventConsumerSettings(OpenWhiskEvents.defaultConsumerConfig(globalConfig)) .withBootstrapServers(s"localhost:$kport") - EventConsumer(settings, Seq(recorder)) + val metricConfig = loadConfigOrThrow[MetricConfig](globalConfig, "user-events") + EventConsumer(settings, Seq(recorder), metricConfig) } protected def freePort(): Int = { diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala index 8e09a70..446ed98 100644 --- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala +++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala @@ -56,8 +56,9 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo behavior of "KamonConsumer" - val namespace = "whisk.system" - val initiator = "testNS" + val initiator = "initiatorTest" + val namespaceDemo = "demo" + val namespaceGuest = "guest" val actionWithCustomPackage = "apimgmt/createApi" val actionWithDefaultPackage = "createApi" val kind = "nodejs:10" @@ -70,43 +71,52 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo publishStringMessageToKafka( EventConsumer.userEventTopic, - newActivationEvent(s"$namespace/$actionWithCustomPackage").serialize) + newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage").serialize) + publishStringMessageToKafka( + EventConsumer.userEventTopic, + newActivationEvent(s"$namespaceDemo/$actionWithDefaultPackage").serialize) publishStringMessageToKafka( EventConsumer.userEventTopic, - newActivationEvent(s"$namespace/$actionWithDefaultPackage").serialize) + newActivationEvent(s"$namespaceGuest/$actionWithDefaultPackage").serialize) sleep(sleepAfterProduce, "sleeping post produce") consumer.shutdown().futureValue sleep(4.second, "sleeping for Kamon reporters to get invoked") // Custom package - TestReporter.counter(activationMetric, actionWithCustomPackage).size shouldBe 1 + TestReporter.counter(activationMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1 TestReporter - .counter(activationMetric, actionWithCustomPackage) - .filter((t) => t.tags.get(actionMemory).get == memory.toString) - .size shouldBe 1 + .counter(activationMetric, namespaceDemo, actionWithCustomPackage) + .filter((t) => t.tags.get(actionMemory).get == memory.toString)(0) + .value shouldBe 1 TestReporter - .counter(activationMetric, actionWithCustomPackage) - .filter((t) => t.tags.get(actionKind).get == kind) - .size shouldBe 1 + .counter(activationMetric, namespaceDemo, actionWithCustomPackage) + .filter((t) => t.tags.get(actionKind).get == kind)(0) + .value shouldBe 1 TestReporter - .counter(statusMetric, actionWithCustomPackage) - .filter((t) => t.tags.get(actionStatus).get == ActivationResponse.statusDeveloperError) - .size shouldBe 1 - TestReporter.counter(coldStartMetric, actionWithCustomPackage).size shouldBe 1 - TestReporter.histogram(waitTimeMetric, actionWithCustomPackage).size shouldBe 1 - TestReporter.histogram(initTimeMetric, actionWithCustomPackage).size shouldBe 1 - TestReporter.histogram(durationMetric, actionWithCustomPackage).size shouldBe 1 + .counter(statusMetric, namespaceDemo, actionWithCustomPackage) + .filter((t) => t.tags.get(actionStatus).get == ActivationResponse.statusDeveloperError)(0) + .value shouldBe 1 + TestReporter.counter(coldStartMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1 + TestReporter.histogram(waitTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1 + TestReporter.histogram(initTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1 + TestReporter.histogram(durationMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1 // Default package - TestReporter.histogram(durationMetric, actionWithDefaultPackage).size shouldBe 1 + TestReporter.histogram(durationMetric, namespaceDemo, actionWithDefaultPackage).size shouldBe 1 + + // Blacklisted namespace should not be tracked + TestReporter.counter(activationMetric, namespaceGuest, actionWithDefaultPackage)(0).value shouldBe 0 + + // Blacklisted should be counted in "openwhisk.namespace.activations" metric + TestReporter.namespaceCounter(namespaceActivationMetric, namespaceGuest)(0).value shouldBe 1 } - private def newActivationEvent(name: String) = + private def newActivationEvent(actionPath: String) = EventMessage( - namespace, - Activation(name, 2, 3.millis, 5.millis, 11.millis, kind, false, memory, None), + "test", + Activation(actionPath, 2, 3.millis, 5.millis, 11.millis, kind, false, memory, None), Subject("testuser"), initiator, UUID("test"), @@ -126,24 +136,35 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo snapshotAccumulator = new PeriodSnapshotAccumulator(Duration.ofDays(1), Duration.ZERO) } - def counter(name: String, action: String) = { + def counter(metricName: String, namespace: String, action: String) = { System.out.println() snapshotAccumulator .peek() .metrics .counters - .filter(_.name == name) + .filter(_.name == metricName) .filter((t) => t.tags.get(actionNamespace).get == namespace) .filter((t) => t.tags.get(initiatorNamespace).get == initiator) .filter((t) => t.tags.get(actionName).get == action) } - def histogram(name: String, action: String) = { + def namespaceCounter(metricName: String, namespace: String) = { + System.out.println() + snapshotAccumulator + .peek() + .metrics + .counters + .filter(_.name == metricName) + .filter((t) => t.tags.get(actionNamespace).get == namespace) + .filter((t) => t.tags.get(initiatorNamespace).get == initiator) + } + + def histogram(metricName: String, namespace: String, action: String) = { snapshotAccumulator .peek() .metrics .histograms - .filter(_.name == name) + .filter(_.name == metricName) .filter((t) => t.tags.get(actionNamespace).get == namespace) .filter((t) => t.tags.get(initiatorNamespace).get == initiator) .filter((t) => t.tags.get(actionName).get == action) diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala index 2241a8b..c0ffedf 100644 --- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala +++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala @@ -29,8 +29,9 @@ import scala.concurrent.duration._ @RunWith(classOf[JUnitRunner]) class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with PrometheusMetricNames { behavior of "PrometheusConsumer" - val namespace = "whisk.system" - val initiator = "testNS" + val initiator = "initiatorTest" + val namespaceDemo = "demo" + val namespaceGuest = "guest" val actionWithCustomPackage = "apimgmt/createApiOne" val actionWithDefaultPackage = "createApi" val kind = "nodejs:10" @@ -42,75 +43,93 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with val consumer = createConsumer(kafkaPort, system.settings.config) publishStringMessageToKafka( EventConsumer.userEventTopic, - newActivationEvent(s"$namespace/$actionWithCustomPackage", kind, memory, initiator).serialize) + newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage", kind, memory).serialize) + publishStringMessageToKafka( + EventConsumer.userEventTopic, + newActivationEvent(s"$namespaceDemo/$actionWithDefaultPackage", kind, memory).serialize) publishStringMessageToKafka( EventConsumer.userEventTopic, - newActivationEvent(s"$namespace/$actionWithDefaultPackage", kind, memory, initiator).serialize) + newActivationEvent(s"$namespaceGuest/$actionWithDefaultPackage", kind, memory).serialize) // Custom package sleep(sleepAfterProduce, "sleeping post produce") consumer.shutdown().futureValue - counterTotal(activationMetric, actionWithCustomPackage) shouldBe 1 - counter(coldStartMetric, actionWithCustomPackage) shouldBe 1 - counterStatus(statusMetric, actionWithCustomPackage, ActivationResponse.statusDeveloperError) shouldBe 1 + counterTotal(activationMetric, namespaceDemo, actionWithCustomPackage) shouldBe 1 + counter(coldStartMetric, namespaceDemo, actionWithCustomPackage) shouldBe 1 + counterStatus(statusMetric, namespaceDemo, actionWithCustomPackage, ActivationResponse.statusDeveloperError) shouldBe 1 - histogramCount(waitTimeMetric, actionWithCustomPackage) shouldBe 1 - histogramSum(waitTimeMetric, actionWithCustomPackage) shouldBe (0.03 +- 0.001) + histogramCount(waitTimeMetric, namespaceDemo, actionWithCustomPackage) shouldBe 1 + histogramSum(waitTimeMetric, namespaceDemo, actionWithCustomPackage) shouldBe (0.03 +- 0.001) - histogramCount(initTimeMetric, actionWithCustomPackage) shouldBe 1 - histogramSum(initTimeMetric, actionWithCustomPackage) shouldBe (433.433 +- 0.01) + histogramCount(initTimeMetric, namespaceDemo, actionWithCustomPackage) shouldBe 1 + histogramSum(initTimeMetric, namespaceDemo, actionWithCustomPackage) shouldBe (433.433 +- 0.01) - histogramCount(durationMetric, actionWithCustomPackage) shouldBe 1 - histogramSum(durationMetric, actionWithCustomPackage) shouldBe (1.254 +- 0.01) + histogramCount(durationMetric, namespaceDemo, actionWithCustomPackage) shouldBe 1 + histogramSum(durationMetric, namespaceDemo, actionWithCustomPackage) shouldBe (1.254 +- 0.01) - gauge(memoryMetric, actionWithCustomPackage).intValue() shouldBe 256 + gauge(memoryMetric, namespaceDemo, actionWithCustomPackage).intValue() shouldBe 256 // Default package - counterTotal(activationMetric, actionWithDefaultPackage) shouldBe 1 + counterTotal(activationMetric, namespaceDemo, actionWithDefaultPackage) shouldBe 1 + + // Blacklisted namespace should not be tracked + counterTotal(activationMetric, namespaceGuest, actionWithDefaultPackage) shouldBe 0 + + // Blacklisted should be counted in "openwhisk_namespace_activations_total" metric + namespaceCounterTotal(namespaceMetric, namespaceGuest) shouldBe 1 } - private def newActivationEvent(name: String, kind: String, memory: String, initiator: String) = + private def newActivationEvent(actionPath: String, kind: String, memory: String) = EventMessage( "test", - Activation(name, 2, 1254.millis, 30.millis, 433433.millis, kind, false, memory.toInt, None), + Activation(actionPath, 2, 1254.millis, 30.millis, 433433.millis, kind, false, memory.toInt, None), Subject("testuser"), initiator, UUID("test"), Activation.typeName) - private def gauge(name: String, action: String) = + private def gauge(metricName: String, namespace: String, action: String) = CollectorRegistry.defaultRegistry.getSampleValue( - name, + metricName, Array("namespace", "initiator", "action"), Array(namespace, initiator, action)) - private def counter(name: String, action: String) = + private def counter(metricName: String, namespace: String, action: String) = CollectorRegistry.defaultRegistry.getSampleValue( - name, + metricName, Array("namespace", "initiator", "action"), Array(namespace, initiator, action)) - private def counterTotal(name: String, action: String) = + private def counterTotal(metricName: String, namespace: String, action: String) = CollectorRegistry.defaultRegistry.getSampleValue( - name, + metricName, Array("namespace", "initiator", "action", "kind", "memory"), Array(namespace, initiator, action, kind, memory)) - private def counterStatus(name: String, action: String, status: String) = + private def namespaceCounterTotal(metricName: String, namespace: String) = + CollectorRegistry.defaultRegistry.getSampleValue( + metricName, + Array("namespace", "initiator"), + Array(namespace, initiator)) + + private def counterStatus(metricName: String, namespace: String, action: String, status: String) = CollectorRegistry.defaultRegistry.getSampleValue( - name, + metricName, Array("namespace", "initiator", "action", "status"), Array(namespace, initiator, action, status)) - private def histogramCount(name: String, action: String) = + private def histogramCount(metricName: String, namespace: String, action: String) = CollectorRegistry.defaultRegistry.getSampleValue( - s"${name}_count", + s"${metricName}_count", Array("namespace", "initiator", "action"), Array(namespace, initiator, action)) - private def histogramSum(name: String, action: String) = + private def histogramSum(metricName: String, namespace: String, action: String) = CollectorRegistry.defaultRegistry - .getSampleValue(s"${name}_sum", Array("namespace", "initiator", "action"), Array(namespace, initiator, action)) + .getSampleValue( + s"${metricName}_sum", + Array("namespace", "initiator", "action"), + Array(namespace, initiator, action)) .doubleValue() }