markusthoemmes closed pull request #2957: splunk logstore URL: https://github.com/apache/incubator-openwhisk/pull/2957
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 5d3fadab3b..c538d6ec56 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -258,4 +258,7 @@ object ConfigKeys { val transactions = "whisk.transactions" val stride = s"$transactions.stride" + + val logStore = "whisk.logstore" + val splunk = s"$logStore.splunk" } diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala new file mode 100644 index 0000000000..465fb2532f --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import akka.actor.ActorSystem +import whisk.core.entity.Identity +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation} + +import scala.concurrent.Future + +/** + * Docker log driver based LogStore impl. Uses docker log driver to emit container logs to an external store. + * Fetching logs from that external store is not provided in this trait. This SPI requires the + * ContainerArgs.extraArgs to be used to indicate where the logs are shipped. + * see https://docs.docker.com/config/containers/logging/configure/#configure-the-logging-driver-for-a-container + * + * Fetching logs here is a NOOP, but extended versions can customize fetching, e.g. from ELK or Splunk etc. + */ +class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore { + + /** Indicate --log-driver and --log-opt flags via ContainerArgsConfig.extraArgs */ + override def containerParameters = Map() + + def collectLogs(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction): Future[ActivationLogs] = + Future.successful(ActivationLogs()) //no logs collected when using docker log drivers (see DockerLogStore for json-file exception) + + /** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version, + * e.g. the SplunkLogStore to expose logs from some external source */ + def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = + Future.successful(ActivationLogs(Vector("Logs are not available."))) +} + +object LogDriverLogStoreProvider extends LogStoreProvider { + override def logStore(actorSystem: ActorSystem) = new LogDriverLogStore(actorSystem) +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala new file mode 100644 index 0000000000..596b776131 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.client.RequestBuilding.Post +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.FormData +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.headers.Authorization +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import akka.stream.OverflowStrategy +import akka.stream.QueueOfferResult +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import pureconfig._ +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import spray.json._ +import whisk.common.AkkaLogging +import whisk.core.ConfigKeys +import whisk.core.entity.ActivationLogs +import whisk.core.entity.WhiskActivation + +case class SplunkLogStoreConfig(host: String, + port: Int, + username: String, + password: String, + index: String, + logMessageField: String, + activationIdField: String, + disableSNI: Boolean) +case class SplunkResponse(results: Vector[JsObject]) +object SplunkResponseJsonProtocol extends DefaultJsonProtocol { + implicit val orderFormat = jsonFormat1(SplunkResponse) +} + +/** + * A Splunk based impl of LogDriverLogStore. Logs are routed to splunk via docker log driver, and retrieved via Splunk REST API + * + * @param actorSystem + * @param httpFlow Optional Flow to use for HttpRequest handling (to enable stream based tests) + */ +class SplunkLogStore( + actorSystem: ActorSystem, + httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None, + splunkConfig: SplunkLogStoreConfig = loadConfigOrThrow[SplunkLogStoreConfig](ConfigKeys.splunk)) + extends LogDriverLogStore(actorSystem) { + implicit val as = actorSystem + implicit val ec = as.dispatcher + implicit val materializer = ActorMaterializer() + private val logging = new AkkaLogging(actorSystem.log) + + private val splunkApi = Path / "services" / "search" / "jobs" //see http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs + + import SplunkResponseJsonProtocol._ + + val maxPendingRequests = 500 + + val defaultHttpFlow = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]]( + host = splunkConfig.host, + port = splunkConfig.port, + connectionContext = + if (splunkConfig.disableSNI) + Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true)))) + else Http().defaultClientHttpsContext) + + override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = { + + //example curl request: + // curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00" + //example response: + // {"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some log message"}], "highlighted":{}} + //note: splunk returns results in reverse-chronological order, therefore we include "| reverse" to cause results to arrive in chronological order + val search = + s"""search index="${splunkConfig.index}"| spath ${splunkConfig.activationIdField}| search ${splunkConfig.activationIdField}=${activation.activationId.toString}| table ${splunkConfig.logMessageField}| reverse""" + + val entity = FormData( + Map( + "exec_mode" -> "oneshot", + "search" -> search, + "output_mode" -> "json", + "earliest_time" -> activation.start.toString, //assume that activation start/end are UTC zone, and splunk events are the same + "latest_time" -> activation.end + .plusSeconds(5) //add 5s to avoid a timerange of 0 on short-lived activations + .toString)).toEntity + + logging.debug(this, "sending request") + queueRequest( + Post(Uri(path = splunkApi)) + .withEntity(entity) + .withHeaders(List(Authorization(BasicHttpCredentials(splunkConfig.username, splunkConfig.password))))) + .flatMap(response => { + logging.debug(this, s"splunk API response ${response}") + Unmarshal(response.entity) + .to[SplunkResponse] + .map(r => { + ActivationLogs( + r.results + .map(_.fields(splunkConfig.logMessageField).convertTo[String])) + }) + }) + } + + //based on http://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html + val queue = + Source + .queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew) + .via(httpFlow.getOrElse(defaultHttpFlow)) + .toMat(Sink.foreach({ + case ((Success(resp), p)) => p.success(resp) + case ((Failure(e), p)) => p.failure(e) + }))(Keep.left) + .run() + + def queueRequest(request: HttpRequest): Future[HttpResponse] = { + val responsePromise = Promise[HttpResponse]() + queue.offer(request -> responsePromise).flatMap { + case QueueOfferResult.Enqueued => responsePromise.future + case QueueOfferResult.Dropped => + Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later.")) + case QueueOfferResult.Failure(ex) => Future.failed(ex) + case QueueOfferResult.QueueClosed => + Future.failed( + new RuntimeException( + "Splunk API Client Queue was closed (pool shut down) while running the request. Try again later.")) + } + } +} + +object SplunkLogStoreProvider extends LogStoreProvider { + override def logStore(actorSystem: ActorSystem) = new SplunkLogStore(actorSystem) +} diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala new file mode 100644 index 0000000000..b87e42322e --- /dev/null +++ b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import org.scalatest.FlatSpecLike +import org.scalatest.Matchers +import whisk.core.containerpool.ContainerArgsConfig + +class LogDriverLogStoreTests extends TestKit(ActorSystem("LogDriverLogStore")) with FlatSpecLike with Matchers { + + val testConfig = ContainerArgsConfig( + network = "network", + extraArgs = + Map("log-driver" -> Set("fluentd"), "log-opt" -> Set("fluentd-address=localhost:24225", "tag=OW_CONTAINER"))) + behavior of "LogDriver LogStore" + + it should "set the container parameters from the config" in { + val logDriverLogStore = new LogDriverLogStore(system) + logDriverLogStore.containerParameters shouldBe Map() + } +} diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala new file mode 100644 index 0000000000..08faa479f0 --- /dev/null +++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.javadsl.model.headers.Authorization +import akka.http.scaladsl.model.ContentTypes +import akka.http.scaladsl.model.FormData +import akka.http.scaladsl.model.HttpEntity +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import akka.stream.StreamTcpException +import akka.stream.scaladsl.Flow +import akka.testkit.TestKit +import common.StreamLogging +import java.time.ZonedDateTime +import org.scalatest.Matchers +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.concurrent.ScalaFutures +import scala.util.Failure +import whisk.core.entity.ActivationLogs +import org.scalatest.FlatSpecLike +import pureconfig.error.ConfigReaderException +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.util.Success +import scala.util.Try +import spray.json.JsNumber +import spray.json.JsObject +import spray.json._ +import whisk.core.entity.ActionLimits +import whisk.core.entity.ActivationId +import whisk.core.entity.ActivationResponse +import whisk.core.entity.EntityName +import whisk.core.entity.EntityPath +import whisk.core.entity.LogLimit +import whisk.core.entity.MemoryLimit +import whisk.core.entity.Parameters +import whisk.core.entity.Subject +import whisk.core.entity.TimeLimit +import whisk.core.entity.WhiskActivation +import whisk.core.entity.size._ + +class SplunkLogStoreTests + extends TestKit(ActorSystem("SplunkLogStore")) + with FlatSpecLike + with Matchers + with ScalaFutures + with StreamLogging { + val testConfig = SplunkLogStoreConfig( + "splunk-host", + 8080, + "splunk-user", + "splunk-pass", + "splunk-index", + "log_message", + "activation_id", + false) + + behavior of "Splunk LogStore" + + val startTime = "2007-12-03T10:15:30Z" + val endTime = "2007-12-03T10:15:45Z" + val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is endTime+5 + + val activation = WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = ActivationId(), + start = ZonedDateTime.parse(startTime).toInstant, + end = ZonedDateTime.parse(endTime).toInstant, + response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))), + annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson), + duration = Some(123)) + + implicit val ec = system.dispatcher + implicit val materializer = ActorMaterializer() + + val testFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] = + Flow[(HttpRequest, Promise[HttpResponse])] + .mapAsyncUnordered(1) { + case (request, userContext) => + //we use cachedHostConnectionPoolHttps so won't get the host+port with the request + Unmarshal(request.entity) + .to[FormData] + .map { form => + val earliestTime = form.fields.get("earliest_time") + val latestTime = form.fields.get("latest_time") + val outputMode = form.fields.get("output_mode") + val search = form.fields.get("search") + val execMode = form.fields.get("exec_mode") + + request.uri.path.toString() shouldBe "/services/search/jobs" + request.headers shouldBe List(Authorization.basic(testConfig.username, testConfig.password)) + earliestTime shouldBe Some(startTime) + latestTime shouldBe Some(endTimePlus5) + outputMode shouldBe Some("json") + execMode shouldBe Some("oneshot") + search shouldBe Some( + s"""search index="${testConfig.index}"| spath ${testConfig.activationIdField}| search ${testConfig.activationIdField}=${activation.activationId.toString}| table ${testConfig.logMessageField}| reverse""") + + ( + Success( + HttpResponse( + StatusCodes.OK, + entity = HttpEntity( + ContentTypes.`application/json`, + """{"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some log message"},{"log_message":"some other log message"}], "highlighted":{}}"""))), + userContext) + } + .recover { + case e => + println("failed") + (Failure(e), userContext) + } + } + val failFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] = + Flow[(HttpRequest, Promise[HttpResponse])] + .map { + case (request, userContext) => + (Success(HttpResponse(StatusCodes.InternalServerError)), userContext) + + } + + it should "fail when loading out of box configs (because whisk.logstore.splunk doesn't exist)" in { + assertThrows[ConfigReaderException[_]] { + val splunkStore = new SplunkLogStore(system) + } + + } + it should "find logs based on activation timestamps" in { + //use the a flow that asserts the request structure and provides a response in the expected format + val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig) + val result = Await.result(splunkStore.fetchLogs(activation), 1.second) + result shouldBe ActivationLogs(Vector("some log message", "some other log message")) + } + + it should "fail to connect to bogus host" in { + //use the default http flow with the default bogus-host config + val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig) + val result = splunkStore.fetchLogs(activation) + whenReady(result.failed, Timeout(1.second)) { ex => + ex shouldBe an[StreamTcpException] + } + } + it should "display an error if API cannot be reached" in { + //use a flow that generates a 500 response + val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig) + val result = splunkStore.fetchLogs(activation) + whenReady(result.failed, Timeout(1.second)) { ex => + ex shouldBe an[RuntimeException] + } + + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services