Re: [akka-user] Delay within GraphStageLogic

2016-08-27 Thread Endre Varga
On Sat, Aug 27, 2016 at 12:44 AM, Gary Struthers 
wrote:

> Sorry, my onTimer() doesn't fire.
>

That would be surprising given that all of the built-in time based
operators are implemented in terms of this.


> It looks like all I need to do is 1. use TimerGraphStageLogic 2. call
> scheduleOnce 3. override onTimer. I am misusing Resume for retries but I've
> tried scheduleOnce in other places and it still doesn't fire. Here's an
> example,
>

Is this a Source? You omitted the shape... Anyway, it might be that your
stage is shut down earlier than the timer for some reason (for example
downstream cancelled). Override postStop() and print something there to see.

Here are the implementations of various built-in time based stages for
examples:
https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/impl/Timers.scala

-Endre


>   override def createLogic(inheritedAttributes: Attributes):
> GraphStageLogic = {
>
> new TimerGraphStageLogic(shape) {
>
>
>   private def decider = inheritedAttributes.get[SupervisionStrategy].
> map(_.decider).
>
>   getOrElse(Supervision.stoppingDecider)
>
>
>   var retries = 1
>
>   var duration = 100
>
>
>
>   def myHandler(): Unit = {
>
>   try {
>
> if(testException != null) throw testException
>
> if(iter.hasNext) {
>
>   push(out, iter.next())
>
> }
>
>   } catch {
>
> case NonFatal(e) => decider(e) match {
>
>   case Supervision.Stop => {
>
> failStage(e)
>
>   }
>
>   case Supervision.Resume => {
>
> if(retries > 0) {
>
>   logger.debug("before scheduleOnce retries {} duration
> {}", retries, duration)
>
>   scheduleOnce(None, FiniteDuration(duration, MILLISECONDS
> ))
>
> } else {
>
>   failStage(e) // too many retries
>
> }
>
>   }
>
> }
>
>   }
>
>   }
>
>   setHandler(out, new OutHandler {
>
> override def onPull(): Unit = {
>
>   myHandler()
>
> }
>
>   })
>
>
>   override protected def onTimer(timerKey: Any): Unit = {
>
> retries -= 1
>
> duration *= 2
>
> myHandler()
>
>   }
>
> }
>
>   }
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/c
> urrent/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Delay within GraphStageLogic

2016-08-27 Thread Viktor Klang
I'd probably start by implementing the behavior through the built in
combinators before venturing into creating custom stages.

-- 
Cheers,
√

On Aug 27, 2016 9:23 AM, "Endre Varga"  wrote:

>
>
> On Sat, Aug 27, 2016 at 12:44 AM, Gary Struthers 
> wrote:
>
>> Sorry, my onTimer() doesn't fire.
>>
>
> That would be surprising given that all of the built-in time based
> operators are implemented in terms of this.
>
>
>> It looks like all I need to do is 1. use TimerGraphStageLogic 2. call
>> scheduleOnce 3. override onTimer. I am misusing Resume for retries but I've
>> tried scheduleOnce in other places and it still doesn't fire. Here's an
>> example,
>>
>
> Is this a Source? You omitted the shape... Anyway, it might be that your
> stage is shut down earlier than the timer for some reason (for example
> downstream cancelled). Override postStop() and print something there to see.
>
> Here are the implementations of various built-in time based stages for
> examples: https://github.com/akka/akka/blob/master/akka-
> stream/src/main/scala/akka/stream/impl/Timers.scala
>
> -Endre
>
>
>>   override def createLogic(inheritedAttributes: Attributes):
>> GraphStageLogic = {
>>
>> new TimerGraphStageLogic(shape) {
>>
>>
>>   private def decider = inheritedAttributes.get[SupervisionStrategy].
>> map(_.decider).
>>
>>   getOrElse(Supervision.stoppingDecider)
>>
>>
>>   var retries = 1
>>
>>   var duration = 100
>>
>>
>>
>>   def myHandler(): Unit = {
>>
>>   try {
>>
>> if(testException != null) throw testException
>>
>> if(iter.hasNext) {
>>
>>   push(out, iter.next())
>>
>> }
>>
>>   } catch {
>>
>> case NonFatal(e) => decider(e) match {
>>
>>   case Supervision.Stop => {
>>
>> failStage(e)
>>
>>   }
>>
>>   case Supervision.Resume => {
>>
>> if(retries > 0) {
>>
>>   logger.debug("before scheduleOnce retries {} duration
>> {}", retries, duration)
>>
>>   scheduleOnce(None, FiniteDuration(duration,
>> MILLISECONDS))
>>
>> } else {
>>
>>   failStage(e) // too many retries
>>
>> }
>>
>>   }
>>
>> }
>>
>>   }
>>
>>   }
>>
>>   setHandler(out, new OutHandler {
>>
>> override def onPull(): Unit = {
>>
>>   myHandler()
>>
>> }
>>
>>   })
>>
>>
>>   override protected def onTimer(timerKey: Any): Unit = {
>>
>> retries -= 1
>>
>> duration *= 2
>>
>> myHandler()
>>
>>   }
>>
>> }
>>
>>   }
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka connection refused with a shared actorRef

2016-08-27 Thread Loïc Girault


I'm not sure if my problem relies on akka or on network setup.


I'm writting a program that uses a server to connect different clients 
together. The server and the clients are actor system.

The problem is that in some contexts, when the clients receives the 
actorRef of the other clients send by the server and try to use it, I have 
a connection refused error.


I've done different test using a server and two clients to diagnose the 
problem :


1) If I run the three process on the same computer with 
akka.remote.netty.tcp.hostname=127.0.0.1 everything works perfectly the 
clients can communicate together


2) Now my computer is behind a physical router. I configured some port to 
be redirected toward some port of my computer.


If I run the server on a distant machine (not on the local network), I 
configure the clients as follow


akka.remote.netty.tcp {
  hostname = "my public ip"
  bind-hostname = "my local ip"
  port = "manually configured port to be redirected"
  bind-port = "manually configured port to be redirected"
}

then everything still works perfectly, the clients can communicate together.


3) Now the software is a game so I don't expect users to know how to 
correctly configure their network, hence I use the following lines to 
programatically generate a configuration :

object ConnectionHelper {

  def findIP4(addresses : util.Enumeration[InetAddress]) : Option[String] =
if(!addresses.hasMoreElements) None
else {
  val address = addresses.nextElement()
  if(address.isInstanceOf[Inet4Address]) Some(address.getHostAddress)
  else findIP4(addresses)
}

  def findLocalIP : Option[String] = {
def aux(interfaces : util.Enumeration[NetworkInterface]) : Option[String] =
  if(!interfaces.hasMoreElements) None
  else {
val interface : NetworkInterface = interfaces.nextElement()
if(interface.isLoopback) aux(interfaces)
else findIP4(interface.getInetAddresses) match {
  case None => aux(interfaces)
  case sa => sa
}
  }
aux(NetworkInterface.getNetworkInterfaces)
  }

  def findPort() : Int = {
val s = new ServerSocket(0)
val p = s.getLocalPort
s.close()
p
  }

  def publicIp = {
val whatismyip = new URL("http://checkip.amazonaws.com";)
val in = new BufferedReader(new InputStreamReader(whatismyip.openStream()))
val ip = in.readLine() //you get the IP as a String
in.close()
ip
  }

  def myConfig(hostname : String, port : Int,
   bindHostname : String, bindPort : Int) =
ConfigFactory.parseString("akka.remote.netty.tcp {\n" +
  s"hostname=$hostname\n"  +
  s"bind-hostname=$bindHostname\n" +
  s"port=$port\n" +
  s"bind-port=$bindPort\n" +
  "}"
)


  def conf(confFileName : String) : Config = {

val port = findPort()

val regularConfig = ConfigFactory.load(confFileName)
val pIp = publicIp
val lIp = findLocalIP getOrElse error("cannot find local ip")
val combined = myConfig(pIp, port, lIp, port).withFallback(regularConfig)

ConfigFactory.load(combined)
  }
}

And then the problem described above appears. What I really don't 
understand is that to begin with, the client successfully communicate with 
the server, they exchange message, hence it doesn't look like there is a 
firewall or a port redirection problem ... But then I got this connection 
refused error when clients are trying to communicate together using the 
same actorRef that was used to exchange message with the server.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Method to block Migration of Cluster Sharded Actors in a Certain State

2016-08-27 Thread kraythe
I have a cluster shared actor that basically has two states; IDLE and BUSY. 
The actor is cluster shared because I need the behavior of having only a 
single one of these in the cluster per user and  I want all of the messages 
that the actor to handle to be serialized per user. The actor mostly sits 
on IDLE but when it gets a certain message it becomes BUSY. During the busy 
state, it is collecting information from several other actors through 
tell() calls and waiting on the response messages. During the BUSY state I 
do NOT want that actor changing nodes. If it did then it would basically 
have to start all over processing the message it was holding in the busy 
state. Furthermore, persisting the actor's state is a non-starter as the 
data it is collecting is quite large, other actors are sending it 
references to immutable data in the same VM, after it gets all the data it 
does some analysis, writes a record, and is done. 

So the question is, if a node falls out of the cluster (or a new node 
joins) while the actor is BUSY and the rebalancing of the system starts to 
happen, can I prevent the actor from moving to the new node until after it 
goes back to IDLE. 

Thanks a bunch. 

-- Robert

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Cannot configure mailbox for Routees in RoundRobinPool

2016-08-27 Thread Mehmet Cem Güntürkün



Hello everyone,

When I try to set a bounded mailbox to routees of RoundRobinPool, Akka, 
kind of ignores the configuration parameter.


Here is the sample for configuration:


bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1
  mailbox-push-timeout-time = 1s
}

akka.actor.deployment {
  /singletestactor {
mailbox = bounded-mailbox
  }

  /groupedtestactor {
mailbox = bounded-mailbox

router = round-robin-pool
nr-of-instances = 5
  }
}


and Here is the test code:


object MailboxTest {
  def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem()
val singleTestActor = actorSystem.actorOf(Props[TestActor], 
"singletestactor")
for (i <- 1 to 10) {
  singleTestActor ! Hello(i)
}


val groupedTestActor = 
actorSystem.actorOf(Props[TestActor].withRouter(FromConfig, 
"groupedtestactor")
for (i <- 1 to 1000) {
  groupedTestActor ! Hello(i)
}
  }
}

class TestActor extends Actor {
  def receive = {
case Hello(i) => {
  println(s"Hello($i) - begin!")
  Thread.sleep(1)
  println(s"Hello($i) - end!")
}
  }
}

case class Hello(i: Int)



Am I doing something wrong or there is no way to do that?
Mehmet - 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Can't execute more than 4 requests in parallel

2016-08-27 Thread Leonti Bielski
Hi! I'm trying to load test my application using Akka Http client but 
failing to execute more than 4 requests in parallel.
The steps of the test:
1. Create a user
2. Authenticate the user
3. Prepare a request and create a sequence of it to execute in parallel.

Here is the code of the function doing the upload:


def upload() = {
  val username = "ci_user_" + java.util.UUID.randomUUID()
  val createUserRequest = CreateUserRequest(username, "password")

  val toReceiptRequest: (MessageEntity, String, String) => HttpRequest = 
(requestEntity, userId, accessToken) => {
HttpRequest(method = HttpMethods.POST,
  uri = s"http://localhost:9000/user/${userId}/receipt";,
  entity = requestEntity,
  headers = List(Authorization(OAuth2BearerToken(accessToken
  }

  val uploadReceipt: (HttpRequest) => Future[StatusCode] = request => {
val start = System.currentTimeMillis()
println("Starting to upload receipt")

Http().singleRequest(request).map(response => {
  println(response.status)
  val end = System.currentTimeMillis()
  println(s"Receipt uploaded in ${(end - start)}ms")
  response.status
})
  }

  val requests: Future[Seq[HttpRequest]] = for {
userInfo: UserInfo <- createUser(createUserRequest)
accessToken: OAuth2AccessTokenResponse <- authenticateUser(userInfo)
requestEntity: MessageEntity <- createImageFileContent()
  } yield Seq.fill(10)(toReceiptRequest(requestEntity, userInfo.id, 
accessToken.accessToken))

  val result: Future[Seq[StatusCode]] = requests.flatMap(requests => 
Future.sequence(requests.map(request => uploadReceipt(request
  result
}


I'm doing this at the top of the class, so I should have 200 threads 
available:

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool
(200))



The full code is here if it might hold a clue:
http://pastebin.com/iYsyu0En

How can I make it execute requests in parallel?

Thanks!
Leonti

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Asynchronous processing in onReceive of Actor

2016-08-27 Thread Kalpak Gadre
Hi,

I am using Play Framework 2.5.x Java. I have a use case where I need to 
create a process that,

1. Polls Amazon SQS for message(s)
2. For each message, download some file from S3.
3. Update Mongo with new data as per message.
4. Delete the message from SQS.

I have already created non-blocking APIs for all the steps. The APIs 
produce a CompletionStage object. I am evaluating options modeling this 
processing using Akka Actors.

I am thinking about following options,

1. Scheduler:
 - A scheduler will trigger an actor to poll for messages.
 - I will use the CompletionStage returned by the receive message API to 
weave together further steps asynchronously.

2. Scheduler + Message Processing Actor
 - A scheduler will trigger an actor to poll for messages.
 - Each message will be pushed (tell) to an actor which processes the 
messages.
 - Message processing actor will use chaining of CompletionStage to process 
the message asynchronously. 

3. Scheduler + Message Processing Actor per message
 - A scheduler will trigger an actor to poll for messages.
 - Each message will be pushed (tell) to it's dedicated instance of an 
actor which processes the messages.
 - Message processing actor will use chaining of CompletionStage to process 
the message asynchronously. 

I have following questions,

1. As per scheduler's documentation, the scheduler will fire as per fixed 
schedule. It is possible that there are 2 polls of SQS happening at the 
same time in case the amount of time required by a single poll exceeds 
scheduler's interval. Ideally I need a construct which triggers another 
poll only after the first one is complete (closely related to a loop) What 
is the best way to achieve this? I have also thought about an actor 
messaging itself after it has performed 1 poll.

2. It is mentioned in the documentation that an actor should not process 
next message until the current message is processed. If I use chaining of 
CompletionStage, I will be returning from the onReceive() method of actor 
prematurely. Is this acceptable? If not, what is an alternative? How do I 
chain the CompletionStage to weave further operations on the result? I did 
read about pipe. I can pipe the CompletionStage to the same / another actor 
to perform the next step. Even if I use pipe, I am still allowing another 
message into onReceive before the previous one is actually completed 
processing?

Any suggestions how this process should be modeled for Akka?

Thanks,

Kalpak

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Asynchronous processing in onReceive of Actor

2016-08-27 Thread Viktor Klang
Akka Streams?

On Sat, Aug 27, 2016 at 7:53 PM, Kalpak Gadre  wrote:

> Hi,
>
> I am using Play Framework 2.5.x Java. I have a use case where I need to
> create a process that,
>
> 1. Polls Amazon SQS for message(s)
> 2. For each message, download some file from S3.
> 3. Update Mongo with new data as per message.
> 4. Delete the message from SQS.
>
> I have already created non-blocking APIs for all the steps. The APIs
> produce a CompletionStage object. I am evaluating options modeling this
> processing using Akka Actors.
>
> I am thinking about following options,
>
> 1. Scheduler:
>  - A scheduler will trigger an actor to poll for messages.
>  - I will use the CompletionStage returned by the receive message API to
> weave together further steps asynchronously.
>
> 2. Scheduler + Message Processing Actor
>  - A scheduler will trigger an actor to poll for messages.
>  - Each message will be pushed (tell) to an actor which processes the
> messages.
>  - Message processing actor will use chaining of CompletionStage to
> process the message asynchronously.
>
> 3. Scheduler + Message Processing Actor per message
>  - A scheduler will trigger an actor to poll for messages.
>  - Each message will be pushed (tell) to it's dedicated instance of an
> actor which processes the messages.
>  - Message processing actor will use chaining of CompletionStage to
> process the message asynchronously.
>
> I have following questions,
>
> 1. As per scheduler's documentation, the scheduler will fire as per fixed
> schedule. It is possible that there are 2 polls of SQS happening at the
> same time in case the amount of time required by a single poll exceeds
> scheduler's interval. Ideally I need a construct which triggers another
> poll only after the first one is complete (closely related to a loop) What
> is the best way to achieve this? I have also thought about an actor
> messaging itself after it has performed 1 poll.
>
> 2. It is mentioned in the documentation that an actor should not process
> next message until the current message is processed. If I use chaining of
> CompletionStage, I will be returning from the onReceive() method of actor
> prematurely. Is this acceptable? If not, what is an alternative? How do I
> chain the CompletionStage to weave further operations on the result? I did
> read about pipe. I can pipe the CompletionStage to the same / another actor
> to perform the next step. Even if I use pipe, I am still allowing another
> message into onReceive before the previous one is actually completed
> processing?
>
> Any suggestions how this process should be modeled for Akka?
>
> Thanks,
>
> Kalpak
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-http] what is actually happening on request timeout?

2016-08-27 Thread Kyrylo Stokoz
HI All,

I came across a strange issue happening with akka http on request timeout 
which i cannot understand, can some body help me with it?

Consider following code in akka 2.4.9:


import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.server.Directives._
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Zip}

import scala.concurrent.Future

object Test extends App {
  implicit val actorSystem = ActorSystem()
  implicit val ec = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()

  def routes =
(path("test1") & get) {
  complete("result1")
} ~
(path("test2") & get) {
  complete {
Future {
  Thread.sleep(3)
  "result2"
}
  }
}

  def processRequest(route: Flow[HttpRequest, HttpResponse, NotUsed]): 
Flow[HttpRequest, HttpResponse, NotUsed] =
new ExtendedFlow(route).extend()

  val serverSource = Http().bindAndHandle(processRequest(routes), "0.0.0.0", 
port = 11011)
}

final class ExtendedFlow[A, B](originalFlow: Flow[A, B, NotUsed]) {

  def extend(): Flow[A, B, NotUsed] =
Flow.fromGraph {
  GraphDSL.create() { implicit builder =>

val in = builder.add(Flow[A].map { e => println("in " + e); e })

val broadcast = builder.add(Broadcast[A](2))
val zip   = builder.add(Zip[A, B]())

val out = builder.add(Flow[(A, B)].map { o => println("out " + o); o._2 
})

in ~> broadcast; broadcast.out(0) ~> zip.in0
 broadcast.out(1) ~> originalFlow ~> zip.in1; zip.out 
~> out

FlowShape(in.in, out.out)
  }
}
}


Now if i execute `curl -v "http://localhost:11011/test1"` i correctly see 'in' 
and 'out' print statements in console and "result1" sent to user.


My actual confusion is when i execute `curl -v "http://localhost:11011/test2"`.

In this case after 20s (default request timeout in akka http) 
HttpServerBluePrint sends 503 back to user with a message that server was not 
able to produce response in time.

Later, in 30s, future completes as well, result of it i guess is ignored as 
response from client was already handled. 

Question here is what actually happening to the extended flow in this 
situation? 


I don`t see any output from sent from zip.out, though i see 'in' statement 
printed for test2. Seeing this i would assume either:

1. Flow would stuck eventually or

2. Flow would produce wrong pairs in zip commit from next request elements.


>From my observations flow keeps working without any problems/exceptions and 
>produce correct pair in zip? 


Anyone can shed some light what is actually going on here?


Regards,

Kyrylo




-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Delay within GraphStageLogic

2016-08-27 Thread Gary Struthers

Thanks, It fires now. This was a MockSource just for working out how to do 
error handling. The problem was the tests completed before the timer fired.

Gary

On Saturday, August 27, 2016 at 12:23:59 AM UTC-7, drewhk wrote:
>
>
>
>>
> Is this a Source? You omitted the shape... Anyway, it might be that your 
> stage is shut down earlier than the timer for some reason (for example 
> downstream cancelled). Override postStop() and print something there to see.
>
>
> -Endre
>
>>
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.