Hi Akka Team, All,

I have few custom stages, recently i find out that one of them under 
certain conditions was not properly completing graph stage, which was 
causing graph to run forever.
I have timeouts attached to graph and would expect it to fail with timeout 
but it never happened. I created a simple reproducer (below).

I have a BrokenStage that is not completing stage in onUpstreamFinish method 
and i have a graph which has idleTimeout, completionTimeout and killSwitch 
attached. None of this has an effect on the graph it fails with future 
timeout.

I understand that this is developer issue and if i move any of timeouts 
after broken stage it will work as expected.
My questions would be:

1. I would still expect killSwitch or completion timeout to fail the graph, 
why it is not a case?
2. how i can detect such things / safely attach timeouts to graphs? 

Reproducer:
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.concurrent.Await
import scala.concurrent.duration._


case class BrokenStage[A](name: String) extends GraphStage[FlowShape[A, A]] {
  val in = Inlet[A]("in")
  val out = Outlet[A]("out")
  override val shape: FlowShape[A, A] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        println(s"$name: onPush")
        push(out, grab(in))
      }
      override def onUpstreamFinish(): Unit = {
        println(s"$name: onUpstreamFinish")
      }
      override def onUpstreamFailure(ex: Throwable): Unit = {
        println(s"$name: onUpstreamFailure: ${ex.getMessage}")
        super.onUpstreamFailure(ex)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        println(s"$name: onPull")
        pull(in)
      }
      override def onDownstreamFinish(): Unit = {
        println(s"$name: onDownstreamFinish")
        super.onDownstreamFinish()
      }
    })
  }
}

object Example {

  implicit val actorSystem = ActorSystem()
  implicit val ec = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()

  def main(args: Array[String]): Unit = {
    val source = Source.empty[Int]
    val sink = Sink.last[Int]

    val (killSwitch, last) =
      source
        .idleTimeout(10.second)
        .completionTimeout(20.seconds)
        .viaMat(KillSwitches.single)(Keep.right)
        .via(new BrokenStage("after"))
        .toMat(sink)(Keep.both)
        .run()

    Thread.sleep(1000)
    killSwitch.abort(new RuntimeException("boom!"))

    Await.result(last, 25.second)
    sys.exit(-1)
  }

} 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to