We are planning to use Akka Streams for the next version of our network 
monitoring application. Basically the application connects to a number of 
configured network devices (through a variety of protocols). For each 
device we typically periodically write data to it (e.g. triggered by a 
TickSource) and receive responses as well as other unsolicited data which 
we scan and process in a number of ways. Essentially we have an inbound and 
an outbound flow for each device.

The application needs to be able to dynamically stop monitoring devices 
(e.g. based on time of day or changes to configuration). This means that we 
need to be able to tear down flows, thereby closing TCP connections and 
freeing other resources.

However it is not clear how we should do this in the general case...

I can see how it might be done for custom sources and sinks using 
ActorPublisher and ActorSubscriber, but I was hoping that there would be a 
way to insert some kind of cancellable 'circuit breaker' into the flow that 
would do this and allow use of the 'standard' Source and Sink 
implementations (much like takeWithin but controlled externally rather than 
being time based).

I did try to create such a circuit breaker by zipping a source with an 
ActorPublisher source that emits an infinite series of elements (see 
below), but the resulting zipped source does not complete when cancelled 
until (it seems) the main source emits another element. (Surprised the zip 
does not complete when one of its input branches does.)

Any advice/pointers would be greatly appreciated.

Thanks,

Jeremy Stone

object SourceUtils {
  def cancellable[T](source: Source[T])(implicit system: ActorSystem): 
(Source[T], Cancellable) = {

    val (inf, cancellable) = cancellableInfiniteSource

    (source.zipWith(inf).map(_._1), cancellable)
  }

  private def cancellableInfiniteSource(implicit system: ActorSystem): 
(Source[Unit], Cancellable) = {

    case object Stop

    class CancellableSource extends ActorPublisher[Unit] with ActorLogging {
      var stopped = false

      def receive = LoggingReceive {
        case _: Request => sendAll
        case Stop =>
          sendAll
          onComplete()
      }

      def sendAll = for (_ <- 1L to totalDemand) onNext(())
    }

    val ref = system.actorOf(Props(new CancellableSource))

    val cancellableSource = Source(ActorPublisher[Unit](ref))

    (cancellableSource, new Cancellable {
      val cancelled = new AtomicBoolean(false)
      def cancel = {
        if (!cancelled.getAndSet(true)) {
          ref ! Stop
          true
        } else false
      }
      def isCancelled = cancelled.get
    })
  }

  implicit class MoreSourceOps[T](source: Source[T]) {
    def zipWith[U](other: Source[U]): (Source[(T, U)]) = {

      Source() { implicit b: FlowGraphBuilder =>
        import FlowGraphImplicits._

        val out = UndefinedSink[(T, U)]
        val zip = Zip[T, U]

        source ~> zip.left
        other ~> zip.right

        zip.out ~> out
        out
      }
    }
  }
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to