Re: [akka-user] Can an ActorPublisher put back pressure on the sender?

2015-05-26 Thread Jeroen Rosenberg
Ok, that makes sense or at least is consistent

On Tuesday, May 26, 2015 at 12:08:54 PM UTC+2, Patrik Nordwall wrote:
>
>
>
> On Tue, May 26, 2015 at 11:56 AM, Jeroen Rosenberg  > wrote:
>
>> Thnx! What will happen when I use Source.actorRef (as you suggested) with 
>> OverflowStrategy.backpressure?
>>
>
> An IllegalArgument("Backpressure overflowStrategy not supported") will be 
> thrown.
>
> I think that is missing from the documentation 
> .
>  
>
>>
>> On Friday, May 22, 2015 at 4:08:41 PM UTC+2, Patrik Nordwall wrote:
>>>
>>>
>>>
>>> On Thu, May 21, 2015 at 10:33 AM, Jeroen Rosenberg <
>>> jeroen.r...@gmail.com> wrote:
>>>
 I'm using Akka-Http 1.0-RC2 and I'm building a simple streaming server 
 (Chunked HTTP) and client using reactive streams / flow graphs. 

 My server looks like this (simplified version):

 object Server extends App {

   implicit val system = ActorSystem("Server")
   implicit val ec = system.dispatcher
   val (address, port) = ("127.0.0.1", 6000)

   implicit val materializer = 
 ActorFlowMaterializer(ActorFlowMaterializerSettings(system))

   val publisher = Source.actorPublisher(Props(new MyAwesomePublisher))

   val handler = Sink.foreach[Http.IncomingConnection] { con =>
   con handleWith Flow[HttpRequest].map { req =>
   HttpResponse(200).withEntity(Chunked(`application/json`, 
 publisher))
   }
   }

   (Http() bind (address, port) to handler run)
 }

 I can now consume this stream with my akka http client implementation 
 and 'slow down the stream' by applying backpressure. I deliberately slow 
 down my client side processing to trigger the backpressuring. Here's a 
 simplified version:

 class Client(processor: ActorRef) extends Actor {

   private implicit val executionContext = context.system.dispatcher
   private implicit val flowMaterializer: FlowMaterializer = 
 ActorFlowMaterializer(ActorFlowMaterializerSettings(context.system))

   val client =
 Http(context.system).outgoingConnection(host, port, settings = 
 ClientConnectionSettings(context.system))

   val decompress = Flow[ByteString].map {
 data => gunzip(data.toArray)
   }

   val buff = Flow[ByteString].buffer(1000, 
 OverflowStrategy.backpressure)

   val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis, 
 context.system.scheduler)(Future.successful(x)) }

   val consumer = Flow[HttpResponse].map {
 data =>
   FlowGraph.closed() { implicit b =>
 import FlowGraph.Implicits._
 data.entity.dataBytes ~> slowFlow ~> buff ~> Sink.ignore
   }.run()
   }

   override def receive: Receive = {
 case query: String =>
   val req = HttpRequest(GET, "http://localhost:6000/api";)
 .withHeaders(
   Connection("Keep-Alive")
 )
   Source.single(req).via(client).via(consumer).to(Sink.onComplete {
 case Success(_) => println("Success!")
 case Failure(e) => println(s"Error: $e")
   }).run()
   }

 Because of 'slowFlow', I can see that my server 'slows down the stream' 
 (i.e. less throughput for this connected client). So, great!

 However, I wanted to handle the flow processing in another Actor, so I 
 used ActorPublisher and pipe the stream to it, using akka.pattern.pipe:

 class Client(processor: ActorRef) extends Actor {
   ...

   override def receive: Receive = {
 case query: String =>
   val req = HttpRequest(GET, endpoint)
 .withHeaders(
   `Accept-Encoding`(gzip),
   Connection("Keep-Alive")
 ) ~> authorize
   Source.single(req).via(client).runWith(Sink.head) pipeTo self
 case response: HttpResponse =>
   response.entity.dataBytes.map { dataByte =>
  processor ! dataByte
   }.to(Sink.ignore).run()
   }
 }

 class StreamProcessor extends ActorPublisher[ByteString] with Actor {
   override def receive: Actor.Receive = {
 case data: ByteString =>
   if (isActive && totalDemand > 0)
 onNext(data)
   }
 }

 ...
 // elsewhere I'm consuming this publisher


 val src = Source(ActorPublisher[ByteString](streamProcessor))

 FlowGraph.closed() { implicit b =>
 import FlowGraph.Implicits._

 val decompress = Flow[ByteString].map {
data => gunzip(data.toArray)
 }

 val buff = Flow[ByteString].buffer(1000, 
 OverflowStrategy.backpressure)
  val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 
 millis, context.system.scheduler)(Future.successful(x)) }

 src ~> slowFlow ~> b

Re: [akka-user] Can an ActorPublisher put back pressure on the sender?

2015-05-26 Thread Patrik Nordwall
On Tue, May 26, 2015 at 11:56 AM, Jeroen Rosenberg <
jeroen.rosenb...@gmail.com> wrote:

> Thnx! What will happen when I use Source.actorRef (as you suggested) with
> OverflowStrategy.backpressure?
>

An IllegalArgument("Backpressure overflowStrategy not supported") will be
thrown.

I think that is missing from the documentation
.


>
> On Friday, May 22, 2015 at 4:08:41 PM UTC+2, Patrik Nordwall wrote:
>>
>>
>>
>> On Thu, May 21, 2015 at 10:33 AM, Jeroen Rosenberg > > wrote:
>>
>>> I'm using Akka-Http 1.0-RC2 and I'm building a simple streaming server
>>> (Chunked HTTP) and client using reactive streams / flow graphs.
>>>
>>> My server looks like this (simplified version):
>>>
>>> object Server extends App {
>>>
>>>   implicit val system = ActorSystem("Server")
>>>   implicit val ec = system.dispatcher
>>>   val (address, port) = ("127.0.0.1", 6000)
>>>
>>>   implicit val materializer =
>>> ActorFlowMaterializer(ActorFlowMaterializerSettings(system))
>>>
>>>   val publisher = Source.actorPublisher(Props(new MyAwesomePublisher))
>>>
>>>   val handler = Sink.foreach[Http.IncomingConnection] { con =>
>>>   con handleWith Flow[HttpRequest].map { req =>
>>>   HttpResponse(200).withEntity(Chunked(`application/json`,
>>> publisher))
>>>   }
>>>   }
>>>
>>>   (Http() bind (address, port) to handler run)
>>> }
>>>
>>> I can now consume this stream with my akka http client implementation
>>> and 'slow down the stream' by applying backpressure. I deliberately slow
>>> down my client side processing to trigger the backpressuring. Here's a
>>> simplified version:
>>>
>>> class Client(processor: ActorRef) extends Actor {
>>>
>>>   private implicit val executionContext = context.system.dispatcher
>>>   private implicit val flowMaterializer: FlowMaterializer =
>>> ActorFlowMaterializer(ActorFlowMaterializerSettings(context.system))
>>>
>>>   val client =
>>> Http(context.system).outgoingConnection(host, port, settings =
>>> ClientConnectionSettings(context.system))
>>>
>>>   val decompress = Flow[ByteString].map {
>>> data => gunzip(data.toArray)
>>>   }
>>>
>>>   val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
>>>
>>>   val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis,
>>> context.system.scheduler)(Future.successful(x)) }
>>>
>>>   val consumer = Flow[HttpResponse].map {
>>> data =>
>>>   FlowGraph.closed() { implicit b =>
>>> import FlowGraph.Implicits._
>>> data.entity.dataBytes ~> slowFlow ~> buff ~> Sink.ignore
>>>   }.run()
>>>   }
>>>
>>>   override def receive: Receive = {
>>> case query: String =>
>>>   val req = HttpRequest(GET, "http://localhost:6000/api";)
>>> .withHeaders(
>>>   Connection("Keep-Alive")
>>> )
>>>   Source.single(req).via(client).via(consumer).to(Sink.onComplete {
>>> case Success(_) => println("Success!")
>>> case Failure(e) => println(s"Error: $e")
>>>   }).run()
>>>   }
>>>
>>> Because of 'slowFlow', I can see that my server 'slows down the stream'
>>> (i.e. less throughput for this connected client). So, great!
>>>
>>> However, I wanted to handle the flow processing in another Actor, so I
>>> used ActorPublisher and pipe the stream to it, using akka.pattern.pipe:
>>>
>>> class Client(processor: ActorRef) extends Actor {
>>>   ...
>>>
>>>   override def receive: Receive = {
>>> case query: String =>
>>>   val req = HttpRequest(GET, endpoint)
>>> .withHeaders(
>>>   `Accept-Encoding`(gzip),
>>>   Connection("Keep-Alive")
>>> ) ~> authorize
>>>   Source.single(req).via(client).runWith(Sink.head) pipeTo self
>>> case response: HttpResponse =>
>>>   response.entity.dataBytes.map { dataByte =>
>>>  processor ! dataByte
>>>   }.to(Sink.ignore).run()
>>>   }
>>> }
>>>
>>> class StreamProcessor extends ActorPublisher[ByteString] with Actor {
>>>   override def receive: Actor.Receive = {
>>> case data: ByteString =>
>>>   if (isActive && totalDemand > 0)
>>> onNext(data)
>>>   }
>>> }
>>>
>>> ...
>>> // elsewhere I'm consuming this publisher
>>>
>>>
>>> val src = Source(ActorPublisher[ByteString](streamProcessor))
>>>
>>> FlowGraph.closed() { implicit b =>
>>> import FlowGraph.Implicits._
>>>
>>> val decompress = Flow[ByteString].map {
>>>data => gunzip(data.toArray)
>>> }
>>>
>>> val buff = Flow[ByteString].buffer(1000,
>>> OverflowStrategy.backpressure)
>>>  val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20
>>> millis, context.system.scheduler)(Future.successful(x)) }
>>>
>>> src ~> slowFlow ~> buff ~> Sink.ignore
>>> }.run()
>>>
>>>
>>> This works fine, however in StreamProcessor (the ActorPublisher) it
>>> seems if I'm getting more data then I demand the only thing I can do is
>>> drop the messages. Can I apply backpressure here to the sender / upstream?
>>>
>>
>> Then you have to 

Re: [akka-user] Can an ActorPublisher put back pressure on the sender?

2015-05-26 Thread Jeroen Rosenberg
Thnx! What will happen when I use Source.actorRef (as you suggested) with 
OverflowStrategy.backpressure?

On Friday, May 22, 2015 at 4:08:41 PM UTC+2, Patrik Nordwall wrote:
>
>
>
> On Thu, May 21, 2015 at 10:33 AM, Jeroen Rosenberg  > wrote:
>
>> I'm using Akka-Http 1.0-RC2 and I'm building a simple streaming server 
>> (Chunked HTTP) and client using reactive streams / flow graphs. 
>>
>> My server looks like this (simplified version):
>>
>> object Server extends App {
>>
>>   implicit val system = ActorSystem("Server")
>>   implicit val ec = system.dispatcher
>>   val (address, port) = ("127.0.0.1", 6000)
>>
>>   implicit val materializer = 
>> ActorFlowMaterializer(ActorFlowMaterializerSettings(system))
>>
>>   val publisher = Source.actorPublisher(Props(new MyAwesomePublisher))
>>
>>   val handler = Sink.foreach[Http.IncomingConnection] { con =>
>>   con handleWith Flow[HttpRequest].map { req =>
>>   HttpResponse(200).withEntity(Chunked(`application/json`, 
>> publisher))
>>   }
>>   }
>>
>>   (Http() bind (address, port) to handler run)
>> }
>>
>> I can now consume this stream with my akka http client implementation and 
>> 'slow down the stream' by applying backpressure. I deliberately slow down 
>> my client side processing to trigger the backpressuring. Here's a 
>> simplified version:
>>
>> class Client(processor: ActorRef) extends Actor {
>>
>>   private implicit val executionContext = context.system.dispatcher
>>   private implicit val flowMaterializer: FlowMaterializer = 
>> ActorFlowMaterializer(ActorFlowMaterializerSettings(context.system))
>>
>>   val client =
>> Http(context.system).outgoingConnection(host, port, settings = 
>> ClientConnectionSettings(context.system))
>>
>>   val decompress = Flow[ByteString].map {
>> data => gunzip(data.toArray)
>>   }
>>
>>   val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
>>
>>   val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis, 
>> context.system.scheduler)(Future.successful(x)) }
>>
>>   val consumer = Flow[HttpResponse].map {
>> data =>
>>   FlowGraph.closed() { implicit b =>
>> import FlowGraph.Implicits._
>> data.entity.dataBytes ~> slowFlow ~> buff ~> Sink.ignore
>>   }.run()
>>   }
>>
>>   override def receive: Receive = {
>> case query: String =>
>>   val req = HttpRequest(GET, "http://localhost:6000/api";)
>> .withHeaders(
>>   Connection("Keep-Alive")
>> )
>>   Source.single(req).via(client).via(consumer).to(Sink.onComplete {
>> case Success(_) => println("Success!")
>> case Failure(e) => println(s"Error: $e")
>>   }).run()
>>   }
>>
>> Because of 'slowFlow', I can see that my server 'slows down the stream' 
>> (i.e. less throughput for this connected client). So, great!
>>
>> However, I wanted to handle the flow processing in another Actor, so I 
>> used ActorPublisher and pipe the stream to it, using akka.pattern.pipe:
>>
>> class Client(processor: ActorRef) extends Actor {
>>   ...
>>
>>   override def receive: Receive = {
>> case query: String =>
>>   val req = HttpRequest(GET, endpoint)
>> .withHeaders(
>>   `Accept-Encoding`(gzip),
>>   Connection("Keep-Alive")
>> ) ~> authorize
>>   Source.single(req).via(client).runWith(Sink.head) pipeTo self
>> case response: HttpResponse =>
>>   response.entity.dataBytes.map { dataByte =>
>>  processor ! dataByte
>>   }.to(Sink.ignore).run()
>>   }
>> }
>>
>> class StreamProcessor extends ActorPublisher[ByteString] with Actor {
>>   override def receive: Actor.Receive = {
>> case data: ByteString =>
>>   if (isActive && totalDemand > 0)
>> onNext(data)
>>   }
>> }
>>
>> ...
>> // elsewhere I'm consuming this publisher
>>
>>
>> val src = Source(ActorPublisher[ByteString](streamProcessor))
>>
>> FlowGraph.closed() { implicit b =>
>> import FlowGraph.Implicits._
>>
>> val decompress = Flow[ByteString].map {
>>data => gunzip(data.toArray)
>> }
>>
>> val buff = Flow[ByteString].buffer(1000, 
>> OverflowStrategy.backpressure)
>>  val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis, 
>> context.system.scheduler)(Future.successful(x)) }
>>
>> src ~> slowFlow ~> buff ~> Sink.ignore
>> }.run()
>>
>>
>> This works fine, however in StreamProcessor (the ActorPublisher) it seems 
>> if I'm getting more data then I demand the only thing I can do is drop the 
>> messages. Can I apply backpressure here to the sender / upstream?
>>
>
> Then you have to use ordinary actor messages to implement your own flow 
> control, but it would be better if you could stay within the streams domain 
> and let it handle the backpressure.
>
> By the way, lets say that you wanted something like you here implemented 
> with the StreamProcessor ActorPublisher that is dropping messages if there 
> is no demand from downstream. Then you can instead use Source.ac

Re: [akka-user] Can an ActorPublisher put back pressure on the sender?

2015-05-22 Thread Patrik Nordwall
On Thu, May 21, 2015 at 10:33 AM, Jeroen Rosenberg <
jeroen.rosenb...@gmail.com> wrote:

> I'm using Akka-Http 1.0-RC2 and I'm building a simple streaming server
> (Chunked HTTP) and client using reactive streams / flow graphs.
>
> My server looks like this (simplified version):
>
> object Server extends App {
>
>   implicit val system = ActorSystem("Server")
>   implicit val ec = system.dispatcher
>   val (address, port) = ("127.0.0.1", 6000)
>
>   implicit val materializer =
> ActorFlowMaterializer(ActorFlowMaterializerSettings(system))
>
>   val publisher = Source.actorPublisher(Props(new MyAwesomePublisher))
>
>   val handler = Sink.foreach[Http.IncomingConnection] { con =>
>   con handleWith Flow[HttpRequest].map { req =>
>   HttpResponse(200).withEntity(Chunked(`application/json`,
> publisher))
>   }
>   }
>
>   (Http() bind (address, port) to handler run)
> }
>
> I can now consume this stream with my akka http client implementation and
> 'slow down the stream' by applying backpressure. I deliberately slow down
> my client side processing to trigger the backpressuring. Here's a
> simplified version:
>
> class Client(processor: ActorRef) extends Actor {
>
>   private implicit val executionContext = context.system.dispatcher
>   private implicit val flowMaterializer: FlowMaterializer =
> ActorFlowMaterializer(ActorFlowMaterializerSettings(context.system))
>
>   val client =
> Http(context.system).outgoingConnection(host, port, settings =
> ClientConnectionSettings(context.system))
>
>   val decompress = Flow[ByteString].map {
> data => gunzip(data.toArray)
>   }
>
>   val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
>
>   val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis,
> context.system.scheduler)(Future.successful(x)) }
>
>   val consumer = Flow[HttpResponse].map {
> data =>
>   FlowGraph.closed() { implicit b =>
> import FlowGraph.Implicits._
> data.entity.dataBytes ~> slowFlow ~> buff ~> Sink.ignore
>   }.run()
>   }
>
>   override def receive: Receive = {
> case query: String =>
>   val req = HttpRequest(GET, "http://localhost:6000/api";)
> .withHeaders(
>   Connection("Keep-Alive")
> )
>   Source.single(req).via(client).via(consumer).to(Sink.onComplete {
> case Success(_) => println("Success!")
> case Failure(e) => println(s"Error: $e")
>   }).run()
>   }
>
> Because of 'slowFlow', I can see that my server 'slows down the stream'
> (i.e. less throughput for this connected client). So, great!
>
> However, I wanted to handle the flow processing in another Actor, so I
> used ActorPublisher and pipe the stream to it, using akka.pattern.pipe:
>
> class Client(processor: ActorRef) extends Actor {
>   ...
>
>   override def receive: Receive = {
> case query: String =>
>   val req = HttpRequest(GET, endpoint)
> .withHeaders(
>   `Accept-Encoding`(gzip),
>   Connection("Keep-Alive")
> ) ~> authorize
>   Source.single(req).via(client).runWith(Sink.head) pipeTo self
> case response: HttpResponse =>
>   response.entity.dataBytes.map { dataByte =>
>  processor ! dataByte
>   }.to(Sink.ignore).run()
>   }
> }
>
> class StreamProcessor extends ActorPublisher[ByteString] with Actor {
>   override def receive: Actor.Receive = {
> case data: ByteString =>
>   if (isActive && totalDemand > 0)
> onNext(data)
>   }
> }
>
> ...
> // elsewhere I'm consuming this publisher
>
>
> val src = Source(ActorPublisher[ByteString](streamProcessor))
>
> FlowGraph.closed() { implicit b =>
> import FlowGraph.Implicits._
>
> val decompress = Flow[ByteString].map {
>data => gunzip(data.toArray)
> }
>
> val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
>  val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis,
> context.system.scheduler)(Future.successful(x)) }
>
> src ~> slowFlow ~> buff ~> Sink.ignore
> }.run()
>
>
> This works fine, however in StreamProcessor (the ActorPublisher) it seems
> if I'm getting more data then I demand the only thing I can do is drop the
> messages. Can I apply backpressure here to the sender / upstream?
>

Then you have to use ordinary actor messages to implement your own flow
control, but it would be better if you could stay within the streams domain
and let it handle the backpressure.

By the way, lets say that you wanted something like you here implemented
with the StreamProcessor ActorPublisher that is dropping messages if there
is no demand from downstream. Then you can instead use Source.actorRef (see
api docs).

/Patrik


>
> Thnx for any pointers!
>
>
>
>  --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message be

[akka-user] Can an ActorPublisher put back pressure on the sender?

2015-05-21 Thread Jeroen Rosenberg
I'm using Akka-Http 1.0-RC2 and I'm building a simple streaming server 
(Chunked HTTP) and client using reactive streams / flow graphs. 

My server looks like this (simplified version):

object Server extends App {

  implicit val system = ActorSystem("Server")
  implicit val ec = system.dispatcher
  val (address, port) = ("127.0.0.1", 6000)

  implicit val materializer = 
ActorFlowMaterializer(ActorFlowMaterializerSettings(system))

  val publisher = Source.actorPublisher(Props(new MyAwesomePublisher))

  val handler = Sink.foreach[Http.IncomingConnection] { con =>
  con handleWith Flow[HttpRequest].map { req =>
  HttpResponse(200).withEntity(Chunked(`application/json`, 
publisher))
  }
  }

  (Http() bind (address, port) to handler run)
}

I can now consume this stream with my akka http client implementation and 
'slow down the stream' by applying backpressure. I deliberately slow down 
my client side processing to trigger the backpressuring. Here's a 
simplified version:

class Client(processor: ActorRef) extends Actor {

  private implicit val executionContext = context.system.dispatcher
  private implicit val flowMaterializer: FlowMaterializer = 
ActorFlowMaterializer(ActorFlowMaterializerSettings(context.system))

  val client =
Http(context.system).outgoingConnection(host, port, settings = 
ClientConnectionSettings(context.system))

  val decompress = Flow[ByteString].map {
data => gunzip(data.toArray)
  }

  val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)

  val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis, 
context.system.scheduler)(Future.successful(x)) }

  val consumer = Flow[HttpResponse].map {
data =>
  FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
data.entity.dataBytes ~> slowFlow ~> buff ~> Sink.ignore
  }.run()
  }

  override def receive: Receive = {
case query: String =>
  val req = HttpRequest(GET, "http://localhost:6000/api";)
.withHeaders(
  Connection("Keep-Alive")
)
  Source.single(req).via(client).via(consumer).to(Sink.onComplete {
case Success(_) => println("Success!")
case Failure(e) => println(s"Error: $e")
  }).run()
  }

Because of 'slowFlow', I can see that my server 'slows down the stream' 
(i.e. less throughput for this connected client). So, great!

However, I wanted to handle the flow processing in another Actor, so I used 
ActorPublisher and pipe the stream to it, using akka.pattern.pipe:

class Client(processor: ActorRef) extends Actor {
  ...

  override def receive: Receive = {
case query: String =>
  val req = HttpRequest(GET, endpoint)
.withHeaders(
  `Accept-Encoding`(gzip),
  Connection("Keep-Alive")
) ~> authorize
  Source.single(req).via(client).runWith(Sink.head) pipeTo self
case response: HttpResponse =>
  response.entity.dataBytes.map { dataByte =>
 processor ! dataByte
  }.to(Sink.ignore).run()
  }
}

class StreamProcessor extends ActorPublisher[ByteString] with Actor {
  override def receive: Actor.Receive = {
case data: ByteString =>
  if (isActive && totalDemand > 0)
onNext(data)
  }
}

...
// elsewhere I'm consuming this publisher


val src = Source(ActorPublisher[ByteString](streamProcessor))

FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._

val decompress = Flow[ByteString].map {
   data => gunzip(data.toArray)
}

val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
 val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis, 
context.system.scheduler)(Future.successful(x)) }

src ~> slowFlow ~> buff ~> Sink.ignore
}.run()


This works fine, however in StreamProcessor (the ActorPublisher) it seems 
if I'm getting more data then I demand the only thing I can do is drop the 
messages. Can I apply backpressure here to the sender / upstream?

Thnx for any pointers!



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.