Please read this:
- http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
- and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream
-customize.html

Specifically, your trigger should be implemented as async-callback, as it 
comes from the outside but should "wake up" the stage to be able to push 
data again.
In your current setup it never "wakes up" since all pulls/pushes have been 
processed - the stage has no idea it should do something once you called 
open.

-- Konrad

W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray 
napisał:
>
> Hi,
>
> Im currently trying to implement a valve Graph to manage pause/resume. We 
> can control the behavior of the graph by using the MaterializeValue
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
>
> Current implementation of the valve
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
>     val logic = new ValveGraphStageLogic(shape, mode)
>     (logic, logic.switch)
>   }
>
> }
>
>
> The current implementation is pretty simple, each time we are receiving a 
> onPull demand we are requesting by doing pull(in).
> When a onPush demand is received we are checking the current state 
> - if Open we are doing the default behavior by doing push(out,element)
> - if Close we are putting the element into a queue
>
> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
> GraphStageLogic(shape){
>   import shape._
>
>   var bufferedElement = List.empty[A]
>
>   val switch = new ValveSwitch {
>     override def open: Unit = {
>       mode = ValveMode.Open
>       println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>       bufferedElement.foreach(push(out, _))
>       bufferedElement = List.empty
>     }
>
>     override def close: Unit = {
>       mode = ValveMode.Closed
>     }
>   }
>
>   setHandler(in, new InHandler {
>     override def onPush(): Unit = {
>       val element = grab(in) //acquires the element that has been received 
> during an onPush
>       println(s"${mode} on push called with $element")
>       if (mode == ValveMode.Open) {
>         push(out, element) //push directly the element on the out port
>       } else {
>         bufferedElement = bufferedElement :+ element
>       }
>     }
>   })
>
>   setHandler(out, new OutHandler {
>     override def onPull(): Unit = {
>       println("on pull called")
>       pull(in) //request the next element on in port
>     }
>   })
> }
>
>
> When we are resuming the valve my using the switch.open, we are pushing 
> the element
>
> override def open: Unit = {
>
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
>
> The Current test is failing
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>   
>     val (valve, probe) = Source(1 to 5)
>     .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
> default is closed, dont push any message
>     .toMat(TestSink.probe[Int])(Keep.both)
>     .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open //open the valve should emit the previous
>
>
>   probe.expectNext shouldEqual 1 //we never receive the element
>   probe.expectNext shouldEqual 2
>
>   probe.request(3)
>   probe.expectNext shouldEqual 3
>   probe.expectNext shouldEqual 4
>   probe.expectNext shouldEqual 5
>
>   probe.expectComplete()
> }
>
>
> Here the console log
>
> on pull called
> Closed on push called with 1
> pushing Some(1), out is available ? true
>
> Expected OnNext(_), yet no element signaled during 3 seconds
> java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 
> 3 seconds
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
>
>
> I'm suspecting the current code to have an issue when we are resuming the 
> valve, it doesnt seems the push really works
>
> val switch = new ValveSwitch {
>     override def open: Unit = {
>       mode = ValveMode.Open
>       println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>       bufferedElement.foreach(push(out, _))
>       bufferedElement = Option.empty
>     }
>
>     override def close: Unit = {
>       mode = ValveMode.Closed
>     }
>   }
>
>
> There is definitively something i dont catch up, if anyone could help me 
> to see some light....
>
> Here the gist 
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>
> Any help would be appreciated
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to