We are planning to use Akka Streams for the next version of our network monitoring application. Basically the application connects to a number of configured network devices (through a variety of protocols). For each device we typically periodically write data to it (e.g. triggered by a TickSource) and receive responses as well as other unsolicited data which we scan and process in a number of ways. Essentially we have an inbound and an outbound flow for each device.
The application needs to be able to dynamically stop monitoring devices (e.g. based on time of day or changes to configuration). This means that we need to be able to tear down flows, thereby closing TCP connections and freeing other resources. However it is not clear how we should do this in the general case... I can see how it might be done for custom sources and sinks using ActorPublisher and ActorSubscriber, but I was hoping that there would be a way to insert some kind of cancellable 'circuit breaker' into the flow that would do this and allow use of the 'standard' Source and Sink implementations (much like takeWithin but controlled externally rather than being time based). I did try to create such a circuit breaker by zipping a source with an ActorPublisher source that emits an infinite series of elements (see below), but the resulting zipped source does not complete when cancelled until (it seems) the main source emits another element. (Surprised the zip does not complete when one of its input branches does.) Any advice/pointers would be greatly appreciated. Thanks, Jeremy Stone object SourceUtils { def cancellable[T](source: Source[T])(implicit system: ActorSystem): (Source[T], Cancellable) = { val (inf, cancellable) = cancellableInfiniteSource (source.zipWith(inf).map(_._1), cancellable) } private def cancellableInfiniteSource(implicit system: ActorSystem): (Source[Unit], Cancellable) = { case object Stop class CancellableSource extends ActorPublisher[Unit] with ActorLogging { var stopped = false def receive = LoggingReceive { case _: Request => sendAll case Stop => sendAll onComplete() } def sendAll = for (_ <- 1L to totalDemand) onNext(()) } val ref = system.actorOf(Props(new CancellableSource)) val cancellableSource = Source(ActorPublisher[Unit](ref)) (cancellableSource, new Cancellable { val cancelled = new AtomicBoolean(false) def cancel = { if (!cancelled.getAndSet(true)) { ref ! Stop true } else false } def isCancelled = cancelled.get }) } implicit class MoreSourceOps[T](source: Source[T]) { def zipWith[U](other: Source[U]): (Source[(T, U)]) = { Source() { implicit b: FlowGraphBuilder => import FlowGraphImplicits._ val out = UndefinedSink[(T, U)] val zip = Zip[T, U] source ~> zip.left other ~> zip.right zip.out ~> out out } } } } -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.