Ramin
It your graph, val flow = Flow[Unit] is the perfect place to plug in the
breaker:
def doRun(): Cancellable = {
val breaker = FlowBreaker[Unit]
val stages =
serialization.atop(handshake).atop(process).joinMat(breaker)(Keep.right)
val connection = Tcp().outgoingConnection(host, port)
val graph = connection.joinMat(stages)(Keep.both)
val (outgoingConnection, cancellable) = graph.run()
outgoingConnection onComplete {
case Success(_) ⇒
log.info("graph initialized successfuly")
case Failure(e) ⇒
log.error("graph failed to initialize: {}", e.getMessage)
}
cancellable
}
If the process element was represented as a Flow[ProprietaryProtocol,
ProprietaryProtocol,
NotUsed] you could to the following instead:
val breaker = FlowBreaker[ProprietaryProtocol]
val stages =
serialization.atop(handshake).joinMat(process.viaMat(breaker)(Keep.right))(Keep.right)
Hope that helps :)
Rafał
W dniu piątek, 26 lutego 2016 17:05:20 UTC+1 użytkownik Ramin Alidousti
napisał:
>
> Thank you, Rafal. Yes, I'd gone through EtcdClientImpl yesterday and while
> I wanted to plug in your breaker implementation I had hard time to line up
> the types properly (my own lack of understanding). Just in case you are
> still interested to help further this is the crux of what I'm doing now:
>
> I have a tcp connection that aside from the original handshake of a
> proprietary protocol, it only receives an infinite stream if bytes as
> source. This is what I have:
>
> def serialization: BidiFlow[ByteString, ProprietaryProtocol,
> ProprietaryProtocol, ByteString, Unit] =???
> def handshake: BidiFlow[ProprietaryProtocol, ProprietaryProtocol,
> ProprietaryProtocol, ProprietaryProtocol, Unit] = ???
> def process: BidiFlow[ProprietaryProtocol, Unit, Unit, ProprietaryProtocol
> , Unit] = ???
> def doRun(): Unit = {
> val flow = Flow[Unit]
> val stages = serialization.atop(handshake).atop(process).join(flow)
> val connection = Tcp().outgoingConnection(host, port)
> val graph = connection.join(stages)
> graph.run() onComplete {
> case Success(_) ⇒
> log.info("graph initialized successfuly")
> case Failure(e) ⇒
> log.error("graph failed to initialize: {}", e.getMessage)
> }
> }
>
>
> Now it is not clear to me where to poke in the breaker.
>
> Again, thanks for the help.
>
> Ramin
>
> On Thursday, February 25, 2016 at 4:31:37 PM UTC-5, Rafał Krzewski wrote:
>>
>> If you're still interested here's an example of usage:
>>
>> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/etcd/EtcdClientImpl.scala#L98
>>
>> In lines 78-97 an infinite source is constructed from a graph that has
>> internal feedback loop. viaMat(FlowBreaker[EtcdResponse])(Keep.right)
>> enriches the source with asynchronous termination capability.
>> Materializing the stream with runWith(someSink) will produce a
>> Cancellable value that you can use to terminate it at arbitrary time.
>>
>> Shutting down the Materializer is a coarse grained solution, but if you
>> are running only a single stream at at time, it will work just fine.
>>
>> Cheers,
>> Rafał
>>
>> W dniu czwartek, 25 lutego 2016 19:16:08 UTC+1 użytkownik Ramin Alidousti
>> napisał:
>>>
>>> Thanks Rafal. I need to digest your code and see how I can plug it into
>>> my graph. As I said I'm new to akka streams.
>>>
>>> On Thursday, February 25, 2016 at 8:03:40 AM UTC-5, Rafał Krzewski wrote:
>>>>
>>>> You can plug in an intermediate graph stage that allows cancelling
>>>> upstream flow + completing downstream flow asynchronously.
>>>>
>>>>
>>>> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala
>>>>
>>>> Cheers,
>>>> Rafał
>>>>
>>>> W dniu środa, 24 lutego 2016 21:23:02 UTC+1 użytkownik Ramin Alidousti
>>>> napisał:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm new to akka-stream. I have a bidiflow that has been materialized.
>>>>> Now in certain cases I need to terminate the stream on demand and maybe
>>>>> recreate it later. The way I'm doing that now is to call terminate() on
>>>>> the
>>>>> implicit system. But I was wondering if there was a more appropriate way
>>>>> of
>>>>> "stopping/terminating" the flow since the call to terminate() is too
>>>>> intrusive as the system is also serving other actor related
>>>>> functionalities.
>>>>>
>>>>> Best,
>>>>> Ramin
>>>>>
>>>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.