> I'm trying to implement the pause/resume in akka stream.
> The current implementation, is buffering all elements into an List if the 
> valve is in mode "close", if not we are pushing elements like the default 
> behavior.
> When we are resuming the flow by calling the materialize value 
>, we change the mode to open and pushing all buffered elements.
> Currently im suspecting this code to be the problem
> val switch = new ValveSwitch {
>   override def open: Unit = {
>     mode = ValveMode.Open
>     println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>     bufferedElement.foreach(push(out, _))
>     bufferedElement = Option.empty
>   }
> ...
> }
> In my unit test when the switch is changing close to open, the Sink never 
> receive the elements
> "A closed valve" should "emit only 3 elements after it has been open" in {
>   val (valve, probe) = Source(1 to 5)
>     .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>     .toMat(TestSink.probe[Int])(Keep.both)
>     .run()
>   probe.request(2)
>   probe.expectNoMsg()
> // we are pushing the buffered elements
>   probe.expectNext shouldEqual 1 // this assert is failing !
>   probe.expectNext shouldEqual 2
> }
> Any help would be really appreciated
> Here
> Le jeudi 13 octobre 2016 12:52:58 UTC-4, regis leray a écrit :
>> Hi, 
>> I'm trying to implements a way to control a flow (start/stop), nothing 
>> was implemented yet in the core of akka-stream
>> My current implementation looks like this.
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>   override val shape = FlowShape(Inlet[A](""), 
>> Outlet[A]("valve.out"))
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>>     val logic = new ValveGraphStageLogic(shape, mode)
>>     (logic, logic.switch)
>>   }
>>   private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>>     import shape._
>>     var bufferedElement = List.empty[A]
>>     val switch = new ValveSwitch {
>>       override def open: Unit = {
>>         mode = ValveMode.Open
>>         println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>         bufferedElement.foreach(push(out, _))
>>         bufferedElement = List.empty
>>       }
>>       override def close: Unit = {
>>         mode = ValveMode.Closed
>>       }
>>     }
>>     setHandler(in, new InHandler {
>>       override def onPush(): Unit = {
>>         val element = grab(in) //acquires the element that has been 
>> received during an onPush
>>         println(s"${mode} on push called with $element")
>>         if (mode == ValveMode.Open) {
>>           push(out, element) //push directly the element on the out port
>>         } else {
>>           bufferedElement = bufferedElement :+ element
>>         }
>>       }
>>     })
>>     setHandler(out, new OutHandler {
>>       override def onPull(): Unit = {
>>         println("on pull called")
>>         pull(in) //request the next element on in port
>>       }
>>     })
>>   }
>> }
>> trait ValveMode
>> object ValveMode {
>>   case object Open extends ValveMode
>>   case object Closed extends ValveMode
>> }
>> ====
>> My current unit test is failing. due to the fact when i open the valve, i 
>> never received the previous message.
>> It seems even if i push the element through ( ) the sink 
>> never receive the element
>> class ValveSpec extends FlatSpec {
>>   implicit val system = ActorSystem()
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = materializer.executionContext
>>   "A closed valve" should "emit only 3 elements after it has been open" 
>> in {
>>     val (valve, probe) = Source(1 to 3)
>>       .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>>       .toMat(TestSink.probe[Int])(Keep.both)
>>       .run()
>>     probe.request(1)
>>     probe.expectNoMsg()
>>     probe.expectNext(1)
>>     probe.request(2)
>>     probe.expectNext(2, 3)
>>     probe.expectComplete()
>>   }
>> }
>> Here the gist 

Reply via email to