I think I've moved one step closer: I think I know how to weld a flow 
breaker into my graph:

  def watch(key: String, waitIndex: Option[Int] = None, recursive: 
Option[Boolean] = None, quorum: Option[Boolean] = None): 
Source[EtcdResponse, Cancellable] = {
    case class WatchRequest(key: String, waitIndex: Option[Int], recursive: 
Option[Boolean], quorum: Option[Boolean])
    val init = WatchRequest(key, waitIndex, recursive, quorum)
    val breaker: Graph[FlowShape[WatchRequest, WatchRequest], Cancellable] 
= ???

    Source[EtcdResponse, Cancellable](breaker) { implicit b =>
      import FlowGraph.Implicits._

      val initReq = b.add(Source.single(init))
      val reqMerge = b.add(Merge[WatchRequest](2))
      val runWait = b.add(Flow[WatchRequest].mapAsync { req =>
        this.wait(req.key, req.waitIndex, req.recursive, req.quorum).map { 
resp =>
          (req.copy(waitIndex = Some(resp.node.modifiedIndex + 1)), resp)
        }
      })
      val respUnzip = b.add(Unzip[WatchRequest, EtcdResponse]())

      initReq ~> reqMerge.in(0)
      reqMerge ~> runWait
      runWait ~> respUnzip.in

      breaker => {
        respUnzip.out0 ~> breaker.inlet
        breaker.outlet ~> reqMerge.in(1)
        respUnzip.out1
      }
    }
  }

The question now remains, how do I fabricate a Graph[FlowShape[T, T], 
Cancellable] that will generate an instance of Cancellable on each 
materialization connected to a PushPullStage, in such way that cancel() 
would "blow the fuse" and terminate the stream?

Cheers,
Rafał

W dniu piątek, 24 kwietnia 2015 11:53:31 UTC+2 użytkownik Rafał Krzewski 
napisał:
>
> Hi,
>
> I've decided to dive into the streams ;) and implemented a client for 
> etcd[1] using akka http client. It worked really well, and was a lot of fun!
>
> However, I'm missing one final piece that I wasn't able to figure out: The 
> client offers a watch function that returns a stream of EtcdResponses [2]
> Right now the actual returned type is Source[EtcdResponse, Unit], but 
> actually I'd like the materialized value to be akka.actor.Cancellable
> that would allow the client to shut down the updates stream and release 
> it's resources. I think a custom PushPullStage exposing Cancellable 
> interface, inserted into the flow's feedback loop could do the job of 
> shutting down the stream, but I couldn't find a way to expose the 
> materialized value
> from the FlowGraph construction block. I was looking at TCP streams and 
> TickSource that do return interesting materialized values, but they use
> low level private [stream] APIs so I couldn't adapt any of that to my 
> high-level client code. Hints will be appreciated :)
>
> Cheers,
> Rafał
>
> [1] https://github.com/coreos/etcd
> [2] 
> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/etcd/EtcdClient.scala#L57
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to