This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 2e0c562 Track collection usage metrics for CosmosDB (#4490) 2e0c562 is described below commit 2e0c5624e4e479f13f565d266625425314eff4c0 Author: Chetan Mehrotra <chet...@apache.org> AuthorDate: Thu May 23 09:28:28 2019 +0530 Track collection usage metrics for CosmosDB (#4490) Record collection usage stats for CosmosDB so as to enable tracking the growth of collection in terms of storage size, document count and index size over the period of time. It also enables tracking any indexing progress if any change is done in Index configuration. Note that Count stats are currently not exposed via Azure Portal Further this commit also enables emitting verbose trace for query when in debug mode. This would simplify any query performance analysis. Fixes #4489 --- common/scala/src/main/resources/application.conf | 5 ++ .../cosmosdb/CollectionResourceUsage.scala | 71 ++++++++++++++++++++++ .../database/cosmosdb/CosmosDBArtifactStore.scala | 48 ++++++++++++++- .../core/database/cosmosdb/CosmosDBConfig.scala | 3 +- .../cosmosdb/CollectionResourceUsageTests.scala | 45 ++++++++++++++ .../cosmosdb/CosmosDBArtifactStoreTests.scala | 9 +++ 6 files changed, 178 insertions(+), 3 deletions(-) diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 0c547c9..9d122b7 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -209,6 +209,11 @@ whisk { # it would be marked deleted by setting `_deleted` property to true and then actual delete # happens via TTL. # soft-delete-ttl = 10 h + + # Frequency at which collection resource usage info like collection size, document count etc is recorded + # and exposed as metrics. If any reindexing is in progress then its progress would be logged with this frequency + record-usage-frequency = 10 m + connection-policy { max-pool-size = 1000 # When the value of this property is true, the SDK will direct write operations to diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsage.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsage.scala new file mode 100644 index 0000000..96b7fce --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsage.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import org.apache.commons.io.FileUtils +import org.apache.openwhisk.core.entity.ByteSize +import org.apache.openwhisk.core.entity.SizeUnits.KB + +case class CollectionResourceUsage(documentsSize: Option[ByteSize], + collectionSize: Option[ByteSize], + documentsCount: Option[Long], + indexingProgress: Option[Int], + documentsSizeQuota: Option[ByteSize]) { + def indexSize: Option[ByteSize] = { + for { + ds <- documentsSize + cs <- collectionSize + } yield cs - ds + } + + def asString: String = { + List( + documentsSize.map(ds => s"documentSize: ${displaySize(ds)}"), + indexSize.map(is => s"indexSize: ${displaySize(is)}"), + documentsCount.map(dc => s"documentsCount: $dc"), + documentsSizeQuota.map(dq => s"collectionSizeQuota: ${displaySize(dq)}")).flatten.mkString(",") + } + + private def displaySize(b: ByteSize) = FileUtils.byteCountToDisplaySize(b.toBytes) +} + +object CollectionResourceUsage { + val quotaHeader = "x-ms-resource-quota" + val usageHeader = "x-ms-resource-usage" + val indexHeader = "x-ms-documentdb-collection-index-transformation-progress" + + def apply(responseHeaders: Map[String, String]): Option[CollectionResourceUsage] = { + for { + quota <- responseHeaders.get(quotaHeader).map(headerValueToMap) + usage <- responseHeaders.get(usageHeader).map(headerValueToMap) + } yield { + CollectionResourceUsage( + usage.get("documentsSize").map(_.toLong).map(ByteSize(_, KB)), + usage.get("collectionSize").map(_.toLong).map(ByteSize(_, KB)), + usage.get("documentsCount").map(_.toLong), + responseHeaders.get(indexHeader).map(_.toInt), + quota.get("collectionSize").map(_.toLong).map(ByteSize(_, KB))) + } + } + + private def headerValueToMap(value: String): Map[String, String] = { + //storedProcedures=100;triggers=25;functions=25;documentsCount=-1;documentsSize=xxx;collectionSize=xxx + val pairs = value.split("=|;").grouped(2) + pairs.map { case Array(k, v) => k -> v }.toMap + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index 16d0705..c635092 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -27,7 +27,7 @@ import com.microsoft.azure.cosmosdb._ import com.microsoft.azure.cosmosdb.internal.Constants.Properties import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient import kamon.metric.MeasurementUnit -import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, TransactionId} +import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, Scheduler, TransactionId} import org.apache.openwhisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, reportFailure} import org.apache.openwhisk.core.database._ import org.apache.openwhisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef @@ -39,6 +39,7 @@ import spray.json._ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import scala.util.Success class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val collName: String, @@ -70,6 +71,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val getToken = createToken("get") private val queryToken = createToken("query") private val countToken = createToken("count") + + private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes) + private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes) + private val documentCountToken = createUsageToken("documentCount") + private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt) private val clusterIdValue = config.clusterId.map(JsString(_)) @@ -78,7 +84,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected this, s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " + s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " + - s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], Consistency Level [${config.consistencyLevel}]") + s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " + + s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]") + + private val usageMetricRecorder = config.recordUsageFrequency.map { f => + Scheduler.scheduleWaitAtLeast(f, 10.seconds)(() => recordResourceUsage()) + } //Clone the returned instance as these are mutable def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson) @@ -314,6 +325,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]() if (transid.meta.extraLogging) { options.setPopulateQueryMetrics(true) + options.setEmitVerboseTracesInQuery(true) } def collectQueryMetrics(r: FeedResponse[Document]): Unit = { @@ -416,10 +428,36 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .getOrElse(Future.successful(true)) // For CosmosDB it is expected that the entire document is deleted. override def shutdown(): Unit = { + //Its async so a chance exist for next scheduled job to still trigger + usageMetricRecorder.foreach(system.stop) attachmentStore.foreach(_.shutdown()) clientRef.close() } + def getResourceUsage(): Future[Option[CollectionResourceUsage]] = { + val opts = new RequestOptions + opts.setPopulateQuotaInfo(true) + client + .readCollection(collection.getSelfLink, opts) + .head() + .map(rr => CollectionResourceUsage(rr.getResponseHeaders.asScala.toMap)) + } + + private def recordResourceUsage() = { + getResourceUsage().map { o => + o.foreach { u => + u.documentsCount.foreach(documentCountToken.gauge.set(_)) + u.documentsSize.foreach(ds => documentsSizeToken.gauge.set(ds.toKB)) + u.indexSize.foreach(is => indexSizeToken.gauge.set(is.toKB)) + logging.info(this, s"Collection usage stats for [$collName] are ${u.asString}") + u.indexingProgress.foreach { i => + if (i < 100) logging.info(this, s"Indexing for collection [$collName] is at $i%") + } + } + o + } + } + private def isNotFound[A <: DocumentAbstraction](e: DocumentClientException) = e.getStatusCode == StatusCodes.NotFound.intValue @@ -508,6 +546,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected else LogMarkerToken("cosmosdb", "ru", collName, Some(action))(MeasurementUnit.none) } + private def createUsageToken(name: String, unit: MeasurementUnit = MeasurementUnit.none): LogMarkerToken = { + val tags = Map("collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "used", tags = tags)(unit) + else LogMarkerToken("cosmosdb", name, collName)(unit) + } + private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala index de0de9d..2dba5f8 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala @@ -39,7 +39,8 @@ case class CosmosDBConfig(endpoint: String, connectionPolicy: ConnectionPolicy, timeToLive: Option[Duration], clusterId: Option[String], - softDeleteTTL: Option[FiniteDuration]) { + softDeleteTTL: Option[FiniteDuration], + recordUsageFrequency: Option[FiniteDuration]) { def createClient(): AsyncDocumentClient = { new AsyncDocumentClient.Builder() diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsageTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsageTests.scala new file mode 100644 index 0000000..dd26718 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CollectionResourceUsageTests.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import org.apache.openwhisk.core.database.cosmosdb.CollectionResourceUsage.{indexHeader, quotaHeader, usageHeader} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} +import org.apache.openwhisk.core.entity.size._ + +@RunWith(classOf[JUnitRunner]) +class CollectionResourceUsageTests extends FlatSpec with Matchers { + behavior of "CollectionInfo" + + it should "populate resource usage info" in { + val headers = Map( + usageHeader -> + "storedProcedures=0;triggers=0;functions=0;documentsCount=5058;documentsSize=780;collectionSize=800", + quotaHeader -> "storedProcedures=100;triggers=25;functions=25;documentsCount=-1;documentsSize=335544320;collectionSize=1000", + indexHeader -> "42") + + val usage = CollectionResourceUsage(headers).get + usage shouldBe CollectionResourceUsage( + documentsSize = Some(780.KB), + collectionSize = Some(800.KB), + documentsCount = Some(5058), + indexingProgress = Some(42), + documentsSizeQuota = Some(1000.KB)) + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala index af2e8d8..9600f60 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala @@ -117,6 +117,15 @@ class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase js.get.fields(CosmosDBConstants.clusterId) shouldBe JsString("foo") } + it should "fetch collection usage info" in { + val uopt = activationStore.getResourceUsage().futureValue + uopt shouldBe defined + val u = uopt.get + println(u.asString) + u.documentsCount shouldBe defined + u.documentsSize shouldBe defined + } + behavior of "CosmosDB query debug" it should "log query metrics in debug flow" in {