This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b485c5e Use alpakka's FileRotator instead of the copied one. (#3102) b485c5e is described below commit b485c5e249ed45e0395f2e8056972adb737f0353 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Mon Jan 8 12:53:45 2018 +0100 Use alpakka's FileRotator instead of the copied one. (#3102) --- common/scala/build.gradle | 2 + .../containerpool/logging/LogRotatorSink.scala | 176 --------------------- core/invoker/build.gradle | 1 - 3 files changed, 2 insertions(+), 177 deletions(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 31781f4..2c04731 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -21,6 +21,8 @@ dependencies { compile 'com.typesafe.akka:akka-http-core_2.11:10.0.10' compile 'com.typesafe.akka:akka-http-spray-json_2.11:10.0.10' + compile 'com.lightbend.akka:akka-stream-alpakka-file_2.11:0.15' + compile 'ch.qos.logback:logback-classic:1.2.3' compile 'org.slf4j:jcl-over-slf4j:1.7.25' compile 'org.slf4j:log4j-over-slf4j:1.7.25' diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala deleted file mode 100644 index 6c5681b..0000000 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TO BE TAKEN OUT AFTER ALPAKKA 0.15 RELEASE - -/* - * Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com> - */ - -package akka.stream.alpakka.file.scaladsl - -import java.nio.file.{OpenOption, Path, StandardOpenOption} - -import akka.Done -import akka.stream.ActorAttributes.SupervisionStrategy -import akka.stream._ -import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere} -import akka.stream.scaladsl.{FileIO, Sink, Source} -import akka.stream.stage._ -import akka.util.ByteString - -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} - -object LogRotatorSink { - def apply(functionGeneratorFunction: () => ByteString => Option[Path], - fileOpenOptions: Set[OpenOption] = Set(StandardOpenOption.APPEND, StandardOpenOption.CREATE)) - : Sink[ByteString, Future[Done]] = - Sink.fromGraph(new LogRotatorSink(functionGeneratorFunction, fileOpenOptions)) -} - -final private[scaladsl] class LogRotatorSink(functionGeneratorFunction: () => ByteString => Option[Path], - fileOpenOptions: Set[OpenOption]) - extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { - - val in = Inlet[ByteString]("FRotator.in") - override val shape = SinkShape.of(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { - val promise = Promise[Done]() - val logic = new GraphStageLogic(shape) { - val pathGeneratorFunction: ByteString => Option[Path] = functionGeneratorFunction() - var sourceOut: SubSourceOutlet[ByteString] = _ - var fileSinkCompleted: Seq[Future[IOResult]] = Seq.empty - val decider = - inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - - def failThisStage(ex: Throwable): Unit = - if (!promise.isCompleted) { - if (sourceOut != null) { - sourceOut.fail(ex) - } - cancel(in) - promise.failure(ex) - } - - def generatePathOrFailPeacefully(data: ByteString): Option[Path] = { - var ret = Option.empty[Path] - try { - ret = pathGeneratorFunction(data) - } catch { - case ex: Throwable => - failThisStage(ex) - } - ret - } - - def fileSinkFutureCallbackHandler(future: Future[IOResult])(h: Holder[IOResult]): Unit = - h.elem match { - case Success(IOResult(_, Failure(ex))) if decider(ex) == Supervision.Stop => - promise.failure(ex) - case Success(x) if fileSinkCompleted.size == 1 && fileSinkCompleted.head == future => - promise.trySuccess(Done) - completeStage() - case x: Success[IOResult] => - fileSinkCompleted = fileSinkCompleted.filter(_ != future) - case Failure(ex) => - failThisStage(ex) - case _ => - } - - //init stage where we are waiting for the first path - setHandler( - in, - new InHandler { - override def onPush(): Unit = { - val data = grab(in) - val pathO = generatePathOrFailPeacefully(data) - pathO.fold(if (!isClosed(in)) pull(in))(switchPath(_, data)) - } - - override def onUpstreamFinish(): Unit = - completeStage() - - override def onUpstreamFailure(ex: Throwable): Unit = - failThisStage(ex) - }) - - //we must pull the first element cos we are a sink - override def preStart(): Unit = { - super.preStart() - pull(in) - } - - def futureCB(newFuture: Future[IOResult]) = - getAsyncCallback[Holder[IOResult]](fileSinkFutureCallbackHandler(newFuture)) - - //we recreate the tail of the stream, and emit the data for the next req - def switchPath(path: Path, data: ByteString): Unit = { - val prevOut = Option(sourceOut) - - sourceOut = new SubSourceOutlet[ByteString]("FRotatorSource") - sourceOut.setHandler(new OutHandler { - override def onPull(): Unit = { - sourceOut.push(data) - switchToNormalMode() - } - }) - val newFuture = Source - .fromGraph(sourceOut.source) - .runWith(FileIO.toPath(path, fileOpenOptions))(interpreter.subFusingMaterializer) - - fileSinkCompleted = fileSinkCompleted :+ newFuture - - val holder = new Holder[IOResult](NotYetThere, futureCB(newFuture)) - - newFuture.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) - - prevOut.foreach(_.complete()) - } - - //we change path if needed or push the grabbed data - def switchToNormalMode(): Unit = { - setHandler( - in, - new InHandler { - override def onPush(): Unit = { - val data = grab(in) - val pathO = generatePathOrFailPeacefully(data) - pathO.fold(sourceOut.push(data))(switchPath(_, data)) - } - - override def onUpstreamFinish(): Unit = { - implicit val executionContext: ExecutionContext = - akka.dispatch.ExecutionContexts.sameThreadExecutionContext - promise.completeWith(Future.sequence(fileSinkCompleted).map(_ => Done)) - sourceOut.complete() - } - - override def onUpstreamFailure(ex: Throwable): Unit = - failThisStage(ex) - }) - sourceOut.setHandler(new OutHandler { - override def onPull(): Unit = - pull(in) - }) - } - } - (logic, promise.future) - } - -} diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle index c0108a0..4a9ceff 100644 --- a/core/invoker/build.gradle +++ b/core/invoker/build.gradle @@ -16,7 +16,6 @@ dependencies { compile 'org.apache.curator:curator-recipes:4.0.0', { exclude group: 'org.apache.zookeeper', module:'zookeeper' } compile 'org.apache.zookeeper:zookeeper:3.4.11' - compile 'com.lightbend.akka:akka-stream-alpakka-file_2.11:0.14' } tasks.withType(ScalaCompile) { -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].