[akka-user] An equivalent of ReactiveX `PublishSubject` in akka-streams

2016-03-05 Thread Oleg Ilyenko
Hi everybody,

I have a question regarding an equivalent of Subject 
 from the ReactiveX family 
of libraries. I tried to find an equivalent of `PublishSubject` (actually 
any type of `Subject`) in akka-streams, but unfortunately haven't found any 
suitable alternative.

My use case is pretty simple: I have an akk-http based HTTP server where 
you can post actions which create events on the server side. The stream of 
events is exposed as `Publisher` and remote clients can subscribe to this 
stream of events with SSE (Server Sent Events).

On a first iteration I would like to find a simple implementation of 
`Processor` (from reactive-streams) where I can push events from one side 
(with `onNext`, `onError` and `onComplete`) and subscribe to this stream of 
events on the other side (each connected client will make a subscription). 
With ReactiveX I would normally use `PublishSubject` for this, which will 
allow me to do exactly this kind of pub-sub.

I tried many different approaches with akka-streams, but nothing seems to 
work. As one of my most recent attempts, I tried to implement a simple 
`ActorPublisher[String]` which just publishes (with `onNext`) all received 
string messages:

class EventStore extends ActorPublisher[String] {
  var events = Vector.empty[String]

  def receive = {
case "end" ⇒
  onComplete()
case event: String ⇒
  events  = events :+ event

  deliverEvents()
case Request(_) ⇒ deliverEvents()
case Cancel ⇒ context.stop(self)
  }

  def deliverEvents(): Unit = {
if (isActive && totalDemand > 0) {
  val (use, keep) = events.splitAt(totalDemand.toInt)
  events = keep
  use foreach onNext
}
  }
}

But I still not 100% sure how I can correctly have multiple dynamic 
subscribers. One thing that kind of worked is to implement 
`Subscriber[String]` myself:

val ref = system.actorOf(Props[EventStore])
val actorPublisher = ActorPublisher[String](ref)

// Using `fanout = true` here in order to be able to subscribe more than 
once
val publisher = 
Source.fromPublisher(actorPublisher).runWith(Sink.asPublisher(fanout = 
true))

publisher.subscribe(new Subscriber[String] {
  override def onError(t: Throwable) =
t.printStackTrace()

  override def onSubscribe(s: Subscription) =
s.request(100)

  override def onComplete() =
println("Complete" )

  override def onNext(t: String) =
println(s"next $t")
})

ref ! "event1"
ref ! "event2"
ref ! "event3"
ref ! "end"

This works with multiple subscriptions as well, but I really would like to 
work with high-level akka-sreams DSL here. Creating `Subscriber` manually 
feels like using very low-level API. So I tried things like these:

Source.fromPublisher(publisher).runForeach(i ⇒ "Got " + i).foreach(_ ⇒ 
println("Done"))

Flow[String].to(Sink.foreach(i ⇒ "Got " + 
i)).runWith(Source.fromPublisher(publisher))

But nothing happens: no errors and nothing is printed to the console. 

I can't find any documentation on this particular topic, so I would highly 
appreciate any help. It would also help a lot if somebody can explain or 
point me to the documentation which describes how to subscribe multiple 
dynamic `Flow`s or `Sink`s to a `Publisher` or `Source`. The closes thing I 
could find is `Broadcast` graph stage in Graph DSL. But all of the examples 
have no more than 2 static sinks, which is not really suitable in my 
scenario.

Thank you in advance.

Kind regards,
Oleg

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
Thanks.

I tried to put example using same:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.left)

val result=runnableGraph.run()

def counter[T]: Flow[T, T, Counter] = {
  val internalCounter = new AtomicLong(0)
  Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
  })
}

The result should give me a tuple having count (flow) and sum (Sink) value, 
since I am using Keep.both,* but it is giving only Future[Int].*

what is difference between materialize value and out?

Thanks 
Arun



On Saturday, March 5, 2016 at 6:46:12 PM UTC-6, Rafał Krzewski wrote:
>
> Hi,
> there are a few ways of doing that. Probably the simplest one is using 
> Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts 
> the elements that pass through it and makes the current count available 
> through a "side channel":
>
>   trait Counter {
> def get: Long
>   }
>
>   def counter[T]: Flow[T, T, Counter] = {
> val internalCounter = new AtomicLong(0)
> Flow[T].map{ elem ⇒
>   internalCounter.incrementAndGet()
>   elem
>  }.mapMaterializedValue(_ ⇒ new Counter{
>override def get = internalCounter.get
>  })
>   } 
>
> Another way is using a GraphStageWithMaterializedValue while building a 
> custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like 
> an ordinary GraphStage, you return a pair of GraphStageLogic and the 
> materialized value.
>
> Cheers,
> Rafał
>
> W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia 
> napisał:
>>
>> Hi,
>>
>> can some explain what does it mean of materialized value ? I have see 
>> documentation at 
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>>  
>>
>> I am not sure how Flow can define materialize type, for example the 
>> following code has Input - Tweet, output - Int but Mat is Unit. I would 
>> like to see how someone can define Mat as Int or any example where Flow or 
>> source is defining Mat other than Unit.
>>
>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>>
>>
>>
>> It is quite confusing for me to understand difference between "out"  and 
>> "Mat".
>>
>>
>> Thanks 
>>
>> As
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
the given same code:

val source = Source (1 to 5).filter(x=> x%2==0)

val flow:Flow[Int,Int,Unit]=Flow[Int].map(x=> x * 2)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(flow).toMat(sink)(Keep.both)

I am not sure what is use of using Keep.both vs Keep.left, I thought If I use 
keep.both, will able to get values for flow and sink as tuple.




On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
more 

Source[+Out, +Mat],Flow[-In, +Out, +Mat] and Sink[-In, +Mat] , in all cases 
what is +Mat type and how I can define one such , if possible any example 
will be great. 


On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Arun Sethia
Thanks Rafal.

Based on this I tried to make sample code, where I would like to count 
number of elements being processed and their sum:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)

val result=runnableGraph.run()


def counter[T]: Flow[T, T, Counter] = {
  val internalCounter = new AtomicLong(0)
  Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
  })
}



1. using Keep.both, result should able to return me count and sum, but it is 
not?

2. How materialize values are different than "out"? I am not able to visualize 
the difference between materialize values and out?

Thanks 
Arun



On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Problem with akka sharding 2.3.4

2016-03-05 Thread Need
Thanks for your replay. I will try update to last version akka sharding and let 
you know result

Need

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Thread interrupt during message send

2016-03-05 Thread hbf

On Friday, February 19, 2016 at 12:25:36 AM UTC-8, rkuhn wrote:
>
> Hi Hbf,
>
> (which is a bit funny because I always read «Hauptbahnhof» :-) )
>

I can imagine :)

 

> there are no guarantees, we only promise at-most-once messaging. That 
> said, local message sends are hard to drop without killing the JVM in the 
> process. But interrupting threads may result in undefined behavior (in 
> particular the interrupt and/or the message may get lost, depending on the 
> mailbox and dispatcher implementations).
>

Thanks for explaining in detail!
 

> May I ask why you intend to interrupt Actor threads?
>

I am callling a legacy API which is blocking and can only get stopped via 
interruption. The exception I posted happened during a unit test.

There's hope I can abandon that API soon, so I think it's all fine.

Thanks!
Hbf

> 19 feb 2016 kl. 06:45 skrev hbf >:
>
> Hi everybody,
>
> If a thread gets interrupted during a actor.tell(msg, sender), is it 
> guaranteed that the message still gets sent?
>
> In my test case, the actor probe confirms that the message is indeed sent 
> but having read that there are minor differences between the dispatcher 
> used for tests and the "real" one, I'd like to confirm this.
>
> Thanks!
> Hbf
>
> [ERROR] [02/18/2016 21:39:03.400] [Thread-3] [
> akka://default/system/testProbe-2] interrupted during message send
> java.lang.InterruptedException
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1245)
> at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:442)
> at 
> akka.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:275)
> at 
> akka.testkit.CallingThreadDispatcher.dispatch(CallingThreadDispatcher.scala:208)
> at akka.actor.dungeon.Dispatch$class.sendMessage(Dispatch.scala:132)
> at akka.actor.ActorCell.sendMessage(ActorCell.scala:374)
> at akka.actor.Cell$class.sendMessage(ActorCell.scala:295)
> at akka.actor.ActorCell.sendMessage(ActorCell.scala:374)
> at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:169)
> at akka.actor.ActorRef.tell(ActorRef.scala:128)
> at org.foo.lambda$run$3(Bar.java:62)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.foo.run(Bar.java:57)
>
>
>
>
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe  – Reactive apps on the JVM.
> twitter: @rolandkuhn
> 
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka Stream] Exceptions in AbstractInHandler and AbstractOutHandler

2016-03-05 Thread hbf
Thanks, √!

On Friday, February 12, 2016 at 12:19:23 AM UTC-8, √ wrote:
>
> They will fail the stage if they throw
>
> -- 
> Cheers,
> √
> On Feb 12, 2016 8:54 AM, "hbf" > wrote:
>
>> Hey everybody,
>>
>> The documentation doesn't say how exceptions in a GraphStage's handlers (
>> AbstractInHandler and AbstractOutHandler) are treated. Do they cause 
>> undefined behavior or will they implicitly call failStage()?
>>
>> Thanks!
>> Hbf
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Websockets - Receiving a stream of messages after sending initial message

2016-03-05 Thread Rafał Krzewski
Hi Brandon,

I guess you are using the following method to respond WebSocket handshake:

UpgradeToWebSocket.handleMessagesWith(handlerFlow: Graph 
[FlowShape 
[Message 

, Message 
], 
_]): HttpResponse 


In this case, the flow is indeed one-to-one. For each incoming message only 
a single outgoing message is produced. However there other over variants 
that take a pair of streams:

UpgradeToWebSocket.handleMessagesWith(inSink: Graph 
[SinkShape 
[Message 
], 
_], outSource: Graph 
[SourceShape 
[Message 
], 
_]): HttpResponse 


This allows you to implement a many-to-one, one-to-many, unidirectional 
reader / writer or any arbitrary communication scheme you can think of. You 
could implement the logic in an Actor and then use a pair of ActorSubcriber 
+ ActorPublisher helpers to complete the required plumbing.

Here's an example of WebSocket usage in my toy 
project 
https://github.com/rkrzewski/akka-cluster-etcd/tree/master/examples/cluster-monitor/src/main/scala/pl/caltha/akka/cluster/monitor/frontend
 
(here the input and output channels serve different purposes and are 
independent of one another)

Cheers,
Rafał 

W dniu sobota, 5 marca 2016 18:24:43 UTC+1 użytkownik Brandon Bradley 
napisał:
>
> Hello,
>
> I have a websockets connection that continuously sends messages after I 
> send a particular message. I've followed the websockets client example. 
> But, I only get one message back. I believe this is because streams are 
> one-to-one. Is there a way to process messages received after sending an 
> initial message with akka-http? Or is this the wrong tool for the job.
>
> Cheers!
> Brandon Bradley
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: how to define materialized value

2016-03-05 Thread Rafał Krzewski
Hi,
there are a few ways of doing that. Probably the simplest one is using 
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts 
the elements that pass through it and makes the current count available 
through a "side channel":

  trait Counter {
def get: Long
  }

  def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
  internalCounter.incrementAndGet()
  elem
 }.mapMaterializedValue(_ ⇒ new Counter{
   override def get = internalCounter.get
 })
  } 

Another way is using a GraphStageWithMaterializedValue while building a 
custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like 
an ordinary GraphStage, you return a pair of GraphStageLogic and the 
materialized value.

Cheers,
Rafał

W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 użytkownik Arun Sethia 
napisał:
>
> Hi,
>
> can some explain what does it mean of materialized value ? I have see 
> documentation at 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
>  
>
> I am not sure how Flow can define materialize type, for example the 
> following code has Input - Tweet, output - Int but Mat is Unit. I would 
> like to see how someone can define Mat as Int or any example where Flow or 
> source is defining Mat other than Unit.
>
> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
>
>
>
> It is quite confusing for me to understand difference between "out"  and 
> "Mat".
>
>
> Thanks 
>
> As
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: akka-http complete either directive with status code parameter

2016-03-05 Thread Rafał Krzewski
Hi Constantin,

you got it _almost_ right! Just change the type of implicit from 
ToReponseMarshaller to ToEntityMarshaller and you are good to go.

RouteDirectives.complete expects a (m: ⇒ ToResponseMarshallable) but you 
are providing a (StatusCode, A) pair.
PredefinedToResponseMarshallers.fromStatusCodeAndValue can fix that, but it 
needs a ToEntityMarshaller[A] to fill the gap.

cheers,
Rafał

W dniu piątek, 4 marca 2016 14:01:50 UTC+1 użytkownik Constantin 
Gerstberger napisał:
>
> Hi,
>
> i' trying to build a custom directive which completes a *Future *of 
> *Either*. However, the following only works if i omit the status code 
> when completing the *Right* case (i.e. *complete(result)* ).
>
> def completeEither[A](successStatusCode: StatusCodes.Success, future: => 
> Future[Either[RequestError, A]])(implicit m: ToResponseMarshaller[A]) =
>   onSuccess(future) {
> case Right(result) =>
>   complete(successStatusCode -> result)
> case Left(BadRequest(message)) =>
>   complete(StatusCodes.BadRequest -> message)
> case Left(_) =>
>   complete(StatusCodes.InternalServerError)
>   }
>
>
> The compile error i get is: *Type mismatch, expected: 
> ToResponseMarshallable, actual: (StatusCodes.Success, A)*
> I understand what the error message means but not where it comes from. 
> Based on this post 
>  and due 
> to the implicit parameter, i'd assume that it should work.
>
> Could anyone explain what i'm doing wrong?
>
> Thanks & regards
> Constantin
>
>
>
>
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] how to define materialized value

2016-03-05 Thread Arun Sethia
Hi,

can some explain what does it mean of materialized value ? I have see 
documentation at 
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
 


I am not sure how Flow can define materialize type, for example the 
following code has Input - Tweet, output - Int but Mat is Unit. I would 
like to see how someone can define Mat as Int or any example where Flow or 
source is defining Mat other than Unit.

- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)



It is quite confusing for me to understand difference between "out"  and 
"Mat".


Thanks 

As

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Viktor Klang
Since we have `unfold` and `unfoldAsync` for Source, I'd say it'd be more
than symmetric to have `fold` and `foldAsync` on Sink :)

On Sun, Mar 6, 2016 at 12:28 AM, Giovanni Alberto Caporaletti <
paradi...@gmail.com> wrote:

> Hi Roland,
> you're right, my solution was a bit naive. I came up with this, I'm pretty
> sure it can be done in a better way, looking forward to seeing your
> solution :)
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = 
> Sink.fromGraph {
>   val sink = Sink.fold(zero)(Keep.right[U, U])
>
>   GraphDSL.create(sink) { implicit b => sink =>
> import GraphDSL.Implicits._
> val zip = b.add(ZipWith(f))
> val bcast = b.add(Broadcast[U](2))
> val merge = b.add(Merge[U](2))
> val z = Source.single(zero)
>
> z ~> merge
>  merge ~> zip.in0
>   zip.out.mapAsync(1)(identity) ~> bcast ~> sink
>  merge <~  bcast
>
> SinkShape(zip.in1)
>   }
> }
>
>
> On Saturday, 5 March 2016 21:52:06 UTC+1, rkuhn wrote:
>>
>> Unfortunately these solutions create unbounded amounts of futures without
>> back pressure, so I'd recommend against this approach. But it is late and
>> I'm on the phone so cannot suggest a proper solution.
>>
>> Regards,
>>
>> Roland
>>
>> Sent from my iPhone
>>
>> On 05 Mar 2016, at 17:41, Giovanni Alberto Caporaletti 
>> wrote:
>>
>> how about this:
>>
>> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>>   Sink
>> .fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, 
>> t)))
>> .mapMaterializedValue(_ flatMap identity)
>> }
>>
>> or this:
>>
>> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>>   Flow[T]
>> .fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
>> .mapAsync(1)(identity)
>> .toMat(Sink.head)(Keep.right)
>> }
>>
>>
>>
>> On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote:
>>>
>>> Hi! There is
>>>
>>> f: (U, T) ⇒ Future[U]
>>>
>>> rather than
>>>
>>> f: (U, T) ⇒ U
>>>
>>> in hands. How to create
>>>
>>> Sink[T, Future[U]]
>>>
>>> ?
>>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Giovanni Alberto Caporaletti
Hi Roland,
you're right, my solution was a bit naive. I came up with this, I'm pretty 
sure it can be done in a better way, looking forward to seeing your 
solution :)

def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = 
Sink.fromGraph {
  val sink = Sink.fold(zero)(Keep.right[U, U])

  GraphDSL.create(sink) { implicit b => sink =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith(f))
val bcast = b.add(Broadcast[U](2))
val merge = b.add(Merge[U](2))
val z = Source.single(zero)

z ~> merge
 merge ~> zip.in0
  zip.out.mapAsync(1)(identity) ~> bcast ~> sink
 merge <~  bcast

SinkShape(zip.in1)
  }
}


On Saturday, 5 March 2016 21:52:06 UTC+1, rkuhn wrote:
>
> Unfortunately these solutions create unbounded amounts of futures without 
> back pressure, so I'd recommend against this approach. But it is late and 
> I'm on the phone so cannot suggest a proper solution.
>
> Regards,
>
> Roland 
>
> Sent from my iPhone
>
> On 05 Mar 2016, at 17:41, Giovanni Alberto Caporaletti  > wrote:
>
> how about this:
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Sink
> .fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, 
> t)))
> .mapMaterializedValue(_ flatMap identity)
> }
>
> or this:
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Flow[T]
> .fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
> .mapAsync(1)(identity)
> .toMat(Sink.head)(Keep.right)
> }
>
>
>
> On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote:
>>
>> Hi! There is 
>>
>> f: (U, T) ⇒ Future[U] 
>>
>> rather than 
>>
>> f: (U, T) ⇒ U
>>
>> in hands. How to create 
>>
>> Sink[T, Future[U]]
>>
>> ?
>>
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Restarting FSMs , and avoiding undesired side-effects

2016-03-05 Thread Spencer Judge
Hello all,

I haven't used Akka before, and I'm considering using it to replace some 
code my team recently wrote that we're finding is too complex to understand 
well, and I'd like us to use a more formal structure. Akka fits our use 
case very well (I'd use Erlang, but we depend on too many Java libraries) 
but there is one question I'm left with after spending some time today 
reading through documentation. 

To make a simple example, our system would be doing side-effect performing 
work at the leaf nodes of a supervision tree, who would be (or be 
supervised by) FSM actors. A concrete example is an FSM whose job is to 
start, send an HTTP request somewhere, poll for the status of the sent 
request repeatedly until some condition is reached, and then terminate.

Let's say that FSM dies while in the polling state. I don't want to send 
the initial request again (because presumably the job is still in progress 
somewhere), I want to go straight to the polling state and start polling 
again. 

>From my reading of the docs, persisted actors are restarted by replaying 
all messages back to the actor which is perfect if you're not performing 
side effects, but seems problematic in this case. After googling around I 
have some vague ideas that you're able to inspect replay messages, and 
maybe I could somehow know to throw out the ones I don't want to do, but 
I'm not sure if that's idiomatic.

I've got to imagine this is a pretty common issue, but I couldn't find 
anything that really rang true as the "canonical" way to handle this sort 
of case.

Your responses are appreciated,
Spencer

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Andrew Gaydenko
Roland, hi!

Thanks for pointing the problem out! 
So we'll stay with hope for Future[ProperSolution] :)

Another approach I have tried is to use the same (as in the first 
suggestion above) folding function, but to unwrap from extra Future via 
flatMap(identity) streaming result after run. Sometimes this way isn't 
handy as far as it reduces possible interaction ways between code parts. At 
any case, is this way also affected by the issue?

On Saturday, March 5, 2016 at 11:52:06 PM UTC+3, rkuhn wrote:
>
> Unfortunately these solutions create unbounded amounts of futures without 
> back pressure, so I'd recommend against this approach. But it is late and 
> I'm on the phone so cannot suggest a proper solution.
>
> Regards,
>
> Roland 
>
> Sent from my iPhone
>
> On 05 Mar 2016, at 17:41, Giovanni Alberto Caporaletti  > wrote:
>
> how about this:
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Sink
> .fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, 
> t)))
> .mapMaterializedValue(_ flatMap identity)
> }
>
> or this:
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Flow[T]
> .fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
> .mapAsync(1)(identity)
> .toMat(Sink.head)(Keep.right)
> }
>
>
>
> On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote:
>>
>> Hi! There is 
>>
>> f: (U, T) ⇒ Future[U] 
>>
>> rather than 
>>
>> f: (U, T) ⇒ U
>>
>> in hands. How to create 
>>
>> Sink[T, Future[U]]
>>
>> ?
>>
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Andrew Gaydenko
On Saturday, March 5, 2016 at 11:52:06 PM UTC+3, rkuhn wrote:
>
> Unfortunately these solutions create unbounded amounts of futures without 
> back pressure, so I'd recommend against this approach. But it is late and 
> I'm on the phone so cannot suggest a proper solution.
>

Roland, hi!

Thanks for pointing the problem out! 
So we'll stay with hope for Future[ProperSolution] :)

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Problem with akka sharding 2.3.4

2016-03-05 Thread Konrad Malawski
As mentioned by Guido,
we did loads of fixes in the cluster / sharding during the 2.3.x timeline, 
you'll definitely want to upgrade to the latest 2.3.

Also, reminder that we made a specific effort to keep 2.3 and 2.4 binary 
compatible (sic!) so you cna upgrade to the latest 2.4 as well!
One thing about 2.4 is that it requires JDK8 though, so that's pretty much the 
only difference requirements wise.

Please upgrade and let us know!

-- 
Cheers,
Konrad 'ktoso’ Malawski
Akka @ Lightbend

On 5 March 2016 at 21:37:49, Guido Medina (oxyg...@gmail.com) wrote:

If you are stuck with 2.3.x why don't you just upgrade to 2.3.14?
If that was the problem for 2.3.4 I'm quite sure someone will come here and 
tell you: That was issue XYZ and was solved at version 2.3.x where X > 4

Hope that helps.

Guido.

On Friday, March 4, 2016 at 11:56:54 PM UTC, Need wrote:
I update more infomation

> There is no rebalance for Shard [73] before it reallocated on node 
> "akka.tcp://application@192.168.38.116:2770"
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Roland Kuhn
Unfortunately these solutions create unbounded amounts of futures without back 
pressure, so I'd recommend against this approach. But it is late and I'm on the 
phone so cannot suggest a proper solution.

Regards,

Roland 

Sent from my iPhone

> On 05 Mar 2016, at 17:41, Giovanni Alberto Caporaletti  
> wrote:
> 
> how about this:
> 
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Sink
> .fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, 
> t)))
> .mapMaterializedValue(_ flatMap identity)
> }
> or this:
> 
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Flow[T]
> .fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
> .mapAsync(1)(identity)
> .toMat(Sink.head)(Keep.right)
> }
> 
> 
>> On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote:
>> Hi! There is 
>> 
>> f: (U, T) ⇒ Future[U] 
>> 
>> rather than 
>> 
>> f: (U, T) ⇒ U
>> 
>> in hands. How to create 
>> 
>> Sink[T, Future[U]]
>> 
>> ?
> 
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Problem with akka sharding 2.3.4

2016-03-05 Thread Guido Medina
If you are stuck with 2.3.x why don't you just upgrade to 2.3.14?
If that was the problem for 2.3.4 I'm quite sure someone will come here and 
tell you: That was issue XYZ and was solved at version 2.3.x where X > 4

Hope that helps.

Guido.

On Friday, March 4, 2016 at 11:56:54 PM UTC, Need wrote:
>
> I update more infomation
>
> > There is no rebalance for Shard [73] before it reallocated on node 
> "akka.tcp://application@192.168.38.116:2770 
> 
> "
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [2.5-Scala] How to cleanup Sink/Source?

2016-03-05 Thread Andrew Gaydenko
HI!

I'm playing with Play projects migrating them to v.2.5.0. Play's Iteratee 
and Enumerator have got some means to cleanup streams (Input.EOF and 
onDoneEnumeratiing() appropriately). So, what is that obvious way :) to 
cleanup sources and sinks in Akka Stream?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Websockets - Receiving a stream of messages after sending initial message

2016-03-05 Thread Brandon Bradley
Hello,

I have a websockets connection that continuously sends messages after I 
send a particular message. I've followed the websockets client example. 
But, I only get one message back. I believe this is because streams are 
one-to-one. Is there a way to process messages received after sending an 
initial message with akka-http? Or is this the wrong tool for the job.

Cheers!
Brandon Bradley

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Question about kaka-http websocket client

2016-03-05 Thread Brandon Bradley
This is a good question! I was wondering the same thing.

On Wednesday, March 2, 2016 at 7:07:31 PM UTC-6, Filipp Eritsian wrote:
>
> Hello all,
>
> Hoping someone can shed some light on the following!
>
> I am using the example provided with the documentation:
>
> object WebSocketClient extends App {
>
>   implicit val system = ActorSystem("test")
>   implicit val fm = ActorMaterializer()
>
>   import system.dispatcher
>
>   val incoming: Sink[Message, Future[Done]] =
> Sink.foreach[Message] {
>   case message: TextMessage.Strict =>
> println(message.text)
> }
>
>   val outgoing = Source.single(TextMessage(""))
>
>   val webSocketFlow = 
> Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9001/stats"))
>
>   val (upgradeResponse, closed) = outgoing
> .viaMat(webSocketFlow)(Keep.right)
> .toMat(incoming)(Keep.both)
> .run()
>
>   val connected = upgradeResponse.flatMap { upgrade =>
> if (upgrade.response.status == StatusCodes.OK)
>   Future.successful(Done)
> else if (upgrade.response.status == StatusCodes.SwitchingProtocols)
>   Future.successful(Done)
> else
>   throw new RuntimeException(s"Connection failed: 
> ${upgrade.response.status}")
>   }
>
>   connected.onComplete(println)
>   closed.foreach(_ => println("*** closed ***"))
> }
>
>
> The web socket client seems to close the http connection after about 6 
> seconds. Any way to make the connection stay open? The server I am using 
> just publishes stats every second, continuously.
>
> Server log:
> [DEBUG] [03/03/2016 08:52:30.343] 
> [websockets-akka.actor.default-dispatcher-3] 
> [akka://websockets/system/IO-TCP/selectors/$a/0] New connection accepted
> [DEBUG] [03/03/2016 08:52:36.534] 
> [websockets-akka.actor.default-dispatcher-2] 
> [akka://websockets/system/IO-TCP/selectors/$a/1] Closing connection due to 
> IO error java.io.IOException: Broken pipe
>
> Cheers,
> Filipp
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Andrew Gaydenko
Giovanni, hi!

I used a way similar to the the first suggestion, but wasn't sure it is 
elegant way to wrap zero into Future on rewriting folding function.
So, I'll experiment with both ways when some benchmarks be ready. And I 
need some time to dig in the second suggestion :)

Thanks!!

On Saturday, March 5, 2016 at 7:41:18 PM UTC+3, Giovanni Alberto 
Caporaletti wrote:
>
> how about this:
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Sink
> .fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, 
> t)))
> .mapMaterializedValue(_ flatMap identity)
> }
>
> or this:
>
> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
>   Flow[T]
> .fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
> .mapAsync(1)(identity)
> .toMat(Sink.head)(Keep.right)
> }
>
>
>
> On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote:
>>
>> Hi! There is 
>>
>> f: (U, T) ⇒ Future[U] 
>>
>> rather than 
>>
>> f: (U, T) ⇒ U
>>
>> in hands. How to create 
>>
>> Sink[T, Future[U]]
>>
>> ?
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Giovanni Alberto Caporaletti
how about this:

def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
  Sink
.fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
.mapMaterializedValue(_ flatMap identity)
}

or this:

def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = {
  Flow[T]
.fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t)))
.mapAsync(1)(identity)
.toMat(Sink.head)(Keep.right)
}



On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote:
>
> Hi! There is 
>
> f: (U, T) ⇒ Future[U] 
>
> rather than 
>
> f: (U, T) ⇒ U
>
> in hands. How to create 
>
> Sink[T, Future[U]]
>
> ?
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to? - Sink.foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]]

2016-03-05 Thread Andrew Gaydenko
Hi! There is 

f: (U, T) ⇒ Future[U] 

rather than 

f: (U, T) ⇒ U

in hands. How to create 

Sink[T, Future[U]]

?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-http-2.4.2] connection timeouts under load

2016-03-05 Thread Giovanni Alberto Caporaletti
Let me add that using finch on the same project  with default settings 
(same exact code apart from the very first layer, http req/resp 
parsing/routing) I get *0* timeouts/drops and more or less the same 
performance.
The performance is not an issue (it's getting better and better with every 
release, it's basically already on par with spray/finagle) but dropping 
connections / timing out is.

Luckily this won't go into production for a while so I have some time to 
solve this issue, I don't want to change http frontend, I really hate 
twitter APIs, my business layer is completely written using futures and 
streams

I don't know exactly what to look into, I'm open to suggestions

thank you!

On Saturday, 5 March 2016 13:03:57 UTC+1, Giovanni Alberto Caporaletti 
wrote:
>
> Hi,
> I'll try to explain what I'm experiencing in my akka-http app.
> (I found this issue but it's not been updated for almost a year and I'm 
> not sure it's relevant: https://github.com/akka/akka/issues/17395)
>
> I noticed that under load a lot of connections (~1-2%) were dropped or 
> timed out. I started investigating, tuning os and akka params and trimming 
> down my sample app until I got this:
>
>
> //N.B.: this is a test
>
> implicit val system = ActorSystem()
> implicit val mat: ActorMaterializer = ActorMaterializer()
> implicit val ec = system.dispatcher
>
> val binding: Future[ServerBinding] = Http().bind("0.0.0.0", 1104).map { conn ⇒
> val promise = Promise[Unit]()
> // I don't even wait for the end of the flow
> val handler = Flow[HttpRequest].map { _ ⇒ promise.success(()); 
> HttpResponse() }
>
> // to be sure it's not a mapAsync(1) problem I use map and block here, 
> same result
> val t0 = System.currentTimeMillis()
> println(s"${Thread.currentThread().getName} start")
>
> conn handleWith handler
>
> Await.result(promise.future, 10.seconds)
> println(s"${Thread.currentThread().getName} end 
> ${System.currentTimeMillis() - t0}ms");
>   }.to(Sink.ignore).run()
>
> Await.result(binding, 10.seconds)
>
>
>
> When I run a small test using ab with something like "-c 1000" concurrent 
> connections or more (even if I'm handling one at a time here), some of the 
> requests immediately start getting unusual delays:
>
> default-akka.actor.default-dispatcher-3 start
> default-akka.actor.default-dispatcher-3 end 2015ms -> gets bigger
>
> This keeps getting worse. After a while I can kill ab, wait some minutes 
> and make a single request and it either gets refused or times out. The 
> server is basically *dead*
>
>
> *I get the exact same result with this, if you're wondering why I did all 
> that blocking and printing stuff above:*
>
> val handler = Flow[HttpRequest].map(_ ⇒ 
> HttpResponse()).alsoToMat(Sink.ignore)(Keep.right)
>
> val binding: Future[ServerBinding] = Http().bind("0.0.0.0", 1104).mapAsync(1) 
> { conn ⇒
>   conn handleWith handler
> }.to(Sink.ignore).run()
>
> and the same happens if I use bindAndHandle with a simple route. 
>
>
> In my standard setup (bindAndHandle, any number of concurrent connections 
> (1k to 10k tried) and keepalive for the requests) I see a number of 
> connections between 1 and 3% failing.
> This is what I get calling a simple route with  bindAndHandle, 
> MaxConnections(1) and connection keepalive enabled on the client: 
> lots of timeouts after just 10k calls already:
>
> Concurrency Level:  4000
> Time taken for tests:   60.605 seconds
> Complete requests:  1
> Failed requests:261
>(Connect: 0, Receive: 87, Length: 87, Exceptions: 87)
> Keep-Alive requests:9913
> ...
>
> Connection Times (ms)
>   min  mean[+/-sd] median   max
> Connect:07  31.3  0 191
> Processing: 0  241 2780.8  5   60396
> *Waiting*:0   92 1270.8  5   *60396*
> Total:  0  248 2783.5  5   60459
>
> Percentage of the requests served within a certain time (ms)
> ...
>   90% 13
>   95%255
>   98%   2061
>   99%   3911
>  100%  60459 (longest request) 
>
> It looks like it does the same on my local machine (mac) but I'm not 100% 
> sure. I'm doing the tests on an ubuntu 8-core 24GB ram vm
> I really don't know what to do, I'm trying every possible combination of 
> system parameters and akka config but I keep getting the same result.  
> Basically everything I tried (changing /etc/security/limits.conf, changing 
> sysctl params, changing akka concurrent connections, backlog, dispatchers 
> etc) led to the same result, that is: *connections doing nothing and 
> timing out.* As if the execution were queued somehow
>
>
> Is there something I'm missing? Some tuning parameter/config/something 
> else? 
> It looks like the piece of code that times out is conn handleWith handler 
> even 
> if 'handler' does nothing and and it keeps doing it even after the load 
> stops. I.e. the connection is established correctly, but the processing is 
> stuck.
>
>
> this is my ulimit -a:
> core file size  

[akka-user] Re: [akka-http-2.4.2] connection timeouts under load

2016-03-05 Thread Kyrylo Stokoz
Hi,

I have similar observations after a while server keep accepting requests 
but they all timeout and nothing gets returned in response.
I`m using akka http 2.4.2 and streams to create a simple server which 
handle requests and return files from S3.

In my case i don`t need to do even high load, doing request one after 
another is enough to hang the server. I played with max connections 
parameter and increasing it makes app process more requests but eventually 
it stuck anyway.

>From my observation issue is in http connections pool that are not properly 
releasing connections and when new request comes in runnable graph is 
created but sinks and sources are not properly connected to start the flow.

During my tests i see (not sure this is related though):

ERROR [datastore-rest-api-akka.actor.default-dispatcher-5] - Error in stage 
[One2OneBidi]: Inner stream finished before inputs completed. Outputs might 
have been truncated.
akka.http.impl.util.One2OneBidiFlow$OutputTruncationException$: Inner 
stream finished before inputs completed. Outputs might have been truncated.
ERROR [datastore-rest-api-akka.actor.default-dispatcher-5] - Error in stage 
[One2OneBidi]: Inner stream finished before inputs completed. Outputs might 
have been truncated.
akka.http.impl.util.One2OneBidiFlow$OutputTruncationException$: Inner 
stream finished before inputs completed. Outputs might have been truncated.
ERROR [datastore-rest-api-akka.actor.default-dispatcher-5] - Error in stage 
[One2OneBidi]: Inner stream finished before inputs completed. Outputs might 
have been truncated.
akka.http.impl.util.One2OneBidiFlow$OutputTruncationException$: Inner 
stream finished before inputs completed. Outputs might have been truncated.
ERROR [datastore-rest-api-akka.actor.default-dispatcher-5] - Error in stage 
[One2OneBidi]: Inner stream finished before inputs completed. Outputs might 
have been truncated.
akka.http.impl.util.One2OneBidiFlow$OutputTruncationException$: Inner 
stream finished before inputs completed. Outputs might have been truncated.
 INFO [datastore-rest-api-akka.actor.default-dispatcher-5] - Message 
[akka.io.Tcp$ResumeReading$] from 
Actor[akka://datastore-rest-api/user/StreamSupervisor-0/$$a#1262265379] to 
Actor[akka://datastore-rest-api/system/IO-TCP/selectors/$a/3#-1262857800] 
was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
 INFO [datastore-rest-api-akka.actor.default-dispatcher-5] - Message 
[akka.io.Tcp$ResumeReading$] from 
Actor[akka://datastore-rest-api/user/StreamSupervisor-0/$$e#585879533] to 
Actor[akka://datastore-rest-api/system/IO-TCP/selectors/$a/7#1750981790] 
was not delivered. [2] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.

  


On Saturday, March 5, 2016 at 6:03:57 AM UTC-6, Giovanni Alberto 
Caporaletti wrote:
>
> Hi,
> I'll try to explain what I'm experiencing in my akka-http app.
> (I found this issue but it's not been updated for almost a year and I'm 
> not sure it's relevant: https://github.com/akka/akka/issues/17395)
>
> I noticed that under load a lot of connections (~1-2%) were dropped or 
> timed out. I started investigating, tuning os and akka params and trimming 
> down my sample app until I got this:
>
>
> //N.B.: this is a test
>
> implicit val system = ActorSystem()
> implicit val mat: ActorMaterializer = ActorMaterializer()
> implicit val ec = system.dispatcher
>
> val binding: Future[ServerBinding] = Http().bind("0.0.0.0", 1104).map { conn ⇒
> val promise = Promise[Unit]()
> // I don't even wait for the end of the flow
> val handler = Flow[HttpRequest].map { _ ⇒ promise.success(()); 
> HttpResponse() }
>
> // to be sure it's not a mapAsync(1) problem I use map and block here, 
> same result
> val t0 = System.currentTimeMillis()
> println(s"${Thread.currentThread().getName} start")
>
> conn handleWith handler
>
> Await.result(promise.future, 10.seconds)
> println(s"${Thread.currentThread().getName} end 
> ${System.currentTimeMillis() - t0}ms");
>   }.to(Sink.ignore).run()
>
> Await.result(binding, 10.seconds)
>
>
>
> When I run a small test using ab with something like "-c 1000" concurrent 
> connections or more (even if I'm handling one at a time here), some of the 
> requests immediately start getting unusual delays:
>
> default-akka.actor.default-dispatcher-3 start
> default-akka.actor.default-dispatcher-3 end 2015ms -> gets bigger
>
> This keeps getting worse. After a while I can kill ab, wait some minutes 
> and make a single request and it either gets refused or times out. The 
> server is basically *dead*
>
>
> *I get the exact same result with this, if you're wondering why I did all 
> that blocking and printing stuff above:*
>
> val handler = Flow[HttpRequest].map(_ ⇒ 
>

[akka-user] Re: Override configuration with system environment variables

2016-03-05 Thread Pierre
Found the reason: "." character is not allowed in env property names on 
linux, so this would only work on windows

Cheers

On Saturday, March 5, 2016 at 2:05:18 PM UTC+1, Pierre wrote:
>
> Hello,
>
> Here is the answer to my own question:
>
>   val default = ConfigFactory.parseString(
> """
>   |a {
>   |b = "foo"
>   }
> """.stripMargin)
>
>   import collection.JavaConversions._
>   val conf = ConfigFactory.parseMap(System.getenv(), "env 
> variables").withFallback(ConfigFactory.load())
>
>   // will display the content of env variable a.b if set, foo otherwise
>   println(conf.getString("a.b"))
>
> The issue was that ConfigFactory.systemEnvironment() does not, for some 
> reason, do any parsing, so "a.b=x" is not parsed as  a { b = x }.
>
> Note that ConfigFactory.systemProperties() *does* parse the properties, 
> which explains the different result. I can't explain the inconsistency and 
> this looks like a bug to me.
>
> Thanks for nothing ;-)
>
> Pierre
>
>
>
> On Thursday, March 3, 2016 at 7:00:01 PM UTC+1, Pierre wrote:
>>
>> Hello,
>>
>> I am using amazon beanstalk, which allows me to define app parameters 
>> using system environment variables.
>>
>> If there was only a few well defined variables I would want to override, 
>> the following would be fine:
>>
>> http {
>>  port = 8080
>>  port = ${?HTTP_PORT}
>> }
>>
>>
>> But there are a lot of parameters to override, and moreover I don't want 
>> to have to explicitely define them beforehand.
>>
>> What I'd like would be to use system environment variables just like java 
>> properties, meaning to have:
>> set spray.can.server.registration-timeout=5s
>> java -jar application.jar
>> work exactly like:
>> java -Dspray.can.server.registration-timeout=5s -jar application.jar
>>
>> I tried the following:
>>
>> val default = ConfigFactory.parseString(
>>   """a {
>> |b = "foo"
>> }
>>   """.stripMargin)
>>
>> val conf = ConfigFactory.systemEnvironment().withFallback(default)
>>
>> println(conf.getString("a.b"))
>>
>>
>> which kept printing "foo" even if I defined the environment variable a.b 
>> to "bar". I think it doesn't work because withFallback won't override 
>> values that are already defined.
>>
>> Do you guys have any idea on how to solve this?
>>
>> Thanks,
>>
>> Pierre
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Override configuration with system environment variables

2016-03-05 Thread Pierre
Hello,

Here is the answer to my own question:

  val default = ConfigFactory.parseString(
"""
  |a {
  |b = "foo"
  }
""".stripMargin)

  import collection.JavaConversions._
  val conf = ConfigFactory.parseMap(System.getenv(), "env 
variables").withFallback(ConfigFactory.load())

  // will display the content of env variable a.b if set, foo otherwise
  println(conf.getString("a.b"))

The issue was that ConfigFactory.systemEnvironment() does not, for some 
reason, do any parsing, so "a.b=x" is not parsed as  a { b = x }.

Note that ConfigFactory.systemProperties() *does* parse the properties, 
which explains the different result. I can't explain the inconsistency and 
this looks like a bug to me.

Thanks for nothing ;-)

Pierre



On Thursday, March 3, 2016 at 7:00:01 PM UTC+1, Pierre wrote:
>
> Hello,
>
> I am using amazon beanstalk, which allows me to define app parameters 
> using system environment variables.
>
> If there was only a few well defined variables I would want to override, 
> the following would be fine:
>
> http {
>  port = 8080
>  port = ${?HTTP_PORT}
> }
>
>
> But there are a lot of parameters to override, and moreover I don't want 
> to have to explicitely define them beforehand.
>
> What I'd like would be to use system environment variables just like java 
> properties, meaning to have:
> set spray.can.server.registration-timeout=5s
> java -jar application.jar
> work exactly like:
> java -Dspray.can.server.registration-timeout=5s -jar application.jar
>
> I tried the following:
>
> val default = ConfigFactory.parseString(
>   """a {
> |b = "foo"
> }
>   """.stripMargin)
>
> val conf = ConfigFactory.systemEnvironment().withFallback(default)
>
> println(conf.getString("a.b"))
>
>
> which kept printing "foo" even if I defined the environment variable a.b 
> to "bar". I think it doesn't work because withFallback won't override 
> values that are already defined.
>
> Do you guys have any idea on how to solve this?
>
> Thanks,
>
> Pierre
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-http-2.4.2] connection timeouts under load

2016-03-05 Thread Giovanni Alberto Caporaletti
Hi,
I'll try to explain what I'm experiencing in my akka-http app.
(I found this issue but it's not been updated for almost a year and I'm not 
sure it's relevant: https://github.com/akka/akka/issues/17395)

I noticed that under load a lot of connections (~1-2%) were dropped or 
timed out. I started investigating, tuning os and akka params and trimming 
down my sample app until I got this:


//N.B.: this is a test

implicit val system = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher

val binding: Future[ServerBinding] = Http().bind("0.0.0.0", 1104).map { conn ⇒
val promise = Promise[Unit]()
// I don't even wait for the end of the flow
val handler = Flow[HttpRequest].map { _ ⇒ promise.success(()); 
HttpResponse() }

// to be sure it's not a mapAsync(1) problem I use map and block here, same 
result
val t0 = System.currentTimeMillis()
println(s"${Thread.currentThread().getName} start")

conn handleWith handler

Await.result(promise.future, 10.seconds)
println(s"${Thread.currentThread().getName} end 
${System.currentTimeMillis() - t0}ms");
  }.to(Sink.ignore).run()

Await.result(binding, 10.seconds)



When I run a small test using ab with something like "-c 1000" concurrent 
connections or more (even if I'm handling one at a time here), some of the 
requests immediately start getting unusual delays:

default-akka.actor.default-dispatcher-3 start
default-akka.actor.default-dispatcher-3 end 2015ms -> gets bigger

This keeps getting worse. After a while I can kill ab, wait some minutes 
and make a single request and it either gets refused or times out. The 
server is basically *dead*


*I get the exact same result with this, if you're wondering why I did all 
that blocking and printing stuff above:*

val handler = Flow[HttpRequest].map(_ ⇒ 
HttpResponse()).alsoToMat(Sink.ignore)(Keep.right)

val binding: Future[ServerBinding] = Http().bind("0.0.0.0", 1104).mapAsync(1) { 
conn ⇒
  conn handleWith handler
}.to(Sink.ignore).run()

and the same happens if I use bindAndHandle with a simple route. 


In my standard setup (bindAndHandle, any number of concurrent connections 
(1k to 10k tried) and keepalive for the requests) I see a number of 
connections between 1 and 3% failing.
This is what I get calling a simple route with  bindAndHandle, 
MaxConnections(1) and connection keepalive enabled on the client: lots 
of timeouts after just 10k calls already:

Concurrency Level:  4000
Time taken for tests:   60.605 seconds
Complete requests:  1
Failed requests:261
   (Connect: 0, Receive: 87, Length: 87, Exceptions: 87)
Keep-Alive requests:9913
...

Connection Times (ms)
  min  mean[+/-sd] median   max
Connect:07  31.3  0 191
Processing: 0  241 2780.8  5   60396
*Waiting*:0   92 1270.8  5   *60396*
Total:  0  248 2783.5  5   60459

Percentage of the requests served within a certain time (ms)
...
  90% 13
  95%255
  98%   2061
  99%   3911
 100%  60459 (longest request) 

It looks like it does the same on my local machine (mac) but I'm not 100% 
sure. I'm doing the tests on an ubuntu 8-core 24GB ram vm
I really don't know what to do, I'm trying every possible combination of 
system parameters and akka config but I keep getting the same result.  
Basically everything I tried (changing /etc/security/limits.conf, changing 
sysctl params, changing akka concurrent connections, backlog, dispatchers 
etc) led to the same result, that is: *connections doing nothing and timing 
out.* As if the execution were queued somehow


Is there something I'm missing? Some tuning parameter/config/something 
else? 
It looks like the piece of code that times out is conn handleWith handler even 
if 'handler' does nothing and and it keeps doing it even after the load 
stops. I.e. the connection is established correctly, but the processing is 
stuck.


this is my ulimit -a:
core file size  (blocks, -c) 0
data seg size   (kbytes, -d) unlimited
scheduling priority (-e) 0
file size   (blocks, -f) unlimited
pending signals (-i) 96360
max locked memory   (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files  (-n) 10
pipe size(512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority  (-r) 0
stack size  (kbytes, -s) 8192
cpu time   (seconds, -t) unlimited
max user processes  (-u) 32768
virtual memory  (kbytes, -v) unlimited
file locks  (-x) unlimited

vm.swappiness = 0


Cheers


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message beca