[akka-user] Re: Fault tolerance on unhandled events in an FSM

2015-08-30 Thread Jasper Visser
Update: I got it to work as expected (expected by me, at least):
whenUnhandled {
  case event@Event(message, _) =
unhandled(message)
stop(Failure(sUnhandled message $message in state $stateName))
}

and the test:
it should not handle X in state Y in {
  system.eventStream.subscribe(testActor, classOf[UnhandledMessage])
  actor.setState(Y, Z)
  actor ! X
  expectMsgType[UnhandledMessage](1 seconds)
}

In my case the *stop()* is a bit redundant, since the supervisor will 
restart the actor anyway on an *UnhandledMessage*. But hey, a *stay()* is 
equally weird.

It would be nice if the default behavior of FSM would be *unhandled(message); 
stay()*. That seems to me to integrate better with actor supervisor 
strategies. My 2 cents. =)

Yours,

Jasper

On Sunday, August 30, 2015 at 12:46:33 AM UTC+2, Jasper Visser wrote:

 Hi,

 I have a parent actor and some child actors; most are subclasses of 
 *akka.actor.FSM*, some are simply *akka.actor.Actor*. The parent actor 
 has a *AllForOneStrategy* to restart all the child actors on any 
 unhandled exception. This seems to work as advertised.

 I would like to have it so that when a child actor receives an *unhandled 
 event*, this will also trigger the supervisor strategy. Basically, any 
 unhandled event would result in a restart of all the children. It's 
 probably a sledgehammer approach, but I can always finetune it later.

 Unfortunately (for what I have in mind, at least) the default behavior of 
 FSM on unhandled events is to *log a warning *and* stay()*. On the other 
 hand, the default behavior of Actor is to publish an *UnhandledMessage* or 
 throw *DeathPactException*.

 Is there a simple way to get FSM to behave the same way as Actor?

 I tried this approach:
 whenUnhandled {
   case event@Event(_, _) =
 log.error(sUnhandled event $event)
 unhandled(event)
 stay()
 }
 But that doesn't seem to do anything. The parent never receives the 
 *UnhandledMessage* message.

 I can trigger the supervisor strategy by simply throwing an exception:
 whenUnhandled {
   case event@Event(_, _) =
 log.error(sUnhandled event $event)
 throw new RuntimeException(sUnhandled event $event)
 }
 But in that scenario I have no idea how to unit test the case of an 
 unhandled event.

 For that matter I can find very few examples of testing unhandled 
 messages/events. Closest thing I've found is:
 http://doc.akka.io/docs/akka/2.0.5/scala/testing.html
 import akka.testkit.TestActorRef
 system.eventStream.subscribe(testActor, classOf[UnhandledMessage])
 val ref = TestActorRef[MyActor]
 ref.receive(Unknown)
 expectMsg(1 second, UnhandledMessage(Unknown, system.deadLetters, ref))

 So basically my questions are:

1. What is a good way to use a supervisor strategy in a system that 
includes FSM actors?
2. What is a good way to unit test unhandled events on FSM actors?

 Yours,

 Jasper


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Fault tolerance on unhandled events in an FSM

2015-08-29 Thread Jasper Visser
Hi,

I have a parent actor and some child actors; most are subclasses of 
*akka.actor.FSM*, some are simply *akka.actor.Actor*. The parent actor has 
a *AllForOneStrategy* to restart all the child actors on any unhandled 
exception. This seems to work as advertised.

I would like to have it so that when a child actor receives an *unhandled 
event*, this will also trigger the supervisor strategy. Basically, any 
unhandled event would result in a restart of all the children. It's 
probably a sledgehammer approach, but I can always finetune it later.

Unfortunately (for what I have in mind, at least) the default behavior of 
FSM on unhandled events is to *log a warning *and* stay()*. On the other 
hand, the default behavior of Actor is to publish an *UnhandledMessage* or 
throw *DeathPactException*.

Is there a simple way to get FSM to behave the same way as Actor?

I tried this approach:
whenUnhandled {
  case event@Event(_, _) =
log.error(sUnhandled event $event)
unhandled(event)
stay()
}
But that doesn't seem to do anything. The parent never receives the 
*UnhandledMessage* message.

I can trigger the supervisor strategy by simply throwing an exception:
whenUnhandled {
  case event@Event(_, _) =
log.error(sUnhandled event $event)
throw new RuntimeException(sUnhandled event $event)
}
But in that scenario I have no idea how to unit test the case of an 
unhandled event.

For that matter I can find very few examples of testing unhandled 
messages/events. Closest thing I've found is:
http://doc.akka.io/docs/akka/2.0.5/scala/testing.html
import akka.testkit.TestActorRef
system.eventStream.subscribe(testActor, classOf[UnhandledMessage])
val ref = TestActorRef[MyActor]
ref.receive(Unknown)
expectMsg(1 second, UnhandledMessage(Unknown, system.deadLetters, ref))

So basically my questions are:

   1. What is a good way to use a supervisor strategy in a system that 
   includes FSM actors?
   2. What is a good way to unit test unhandled events on FSM actors?

Yours,

Jasper

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-12 Thread Jasper
Nope, doesn't change anything.

Le lundi 11 août 2014 18:41:05 UTC+2, Patrik Nordwall a écrit :

 Does it change anything if use another (default) dispatcher for the 
 circuit breaker and the futures?

 /Patrik

 11 aug 2014 kl. 17:56 skrev Leonard Meyer lme...@excilys.com 
 javascript::

 Anything blocking I put on a pinned dispatcher. So each FileProcessor and 
 the Controller (which sometimes does JDBC).  Each FileProcessor also has a 
 CircuitBreaker which uses the same pinned dispatcher as said FileProcessor. 

 Is there something wrong with this ?


 2014-08-11 17:34 GMT+02:00 Patrik Nordwall patrik@gmail.com 
 javascript::

 What dispatcher do you use for what?
 You mentioned pinned dispatcher, but you also have context.dispatcher for 
 the circuit breaker. Pinned dispatcher there is probably not a good idea 
 since the actor is blocking (the thread is busy).
 You also have futures, e.g. mapTo.

 /Patrik


 On Mon, Aug 11, 2014 at 4:42 PM, Leonard Meyer lme...@excilys.com 
 javascript: wrote:

 Yes, all I have is at the beginning where it's basically saying who's 
 supervising who. And I have :

 [DEBUG] [08/11/2014 16:36:41.977] [akka.actor.default-dispatcher-2] 
 [akka:///user] now supervising 
 Actor[akka://x/user/connectionFactory#608280561]

 connectionFactory is the name of my datasource actor.


 2014-08-11 16:26 GMT+02:00 √iktor Ҡlang viktor...@gmail.com 
 javascript::

 Did you enable lifecycle logging?


  On Mon, Aug 11, 2014 at 4:22 PM, Jasper lme...@excilys.com 
 javascript: wrote:

 Well, given what I said before, pretty sure yes. And yes the sender() 
 doesn't receive anything so it times out and the circuit breaker keep on 
 doing the Open/HalfOpen circle. Here are some sample logs 
 http://pastebin.com/TUPVMNNi

 This is annoying, I don't know if I'm doing something wrong with Akka, 
 Hikari or PostgreSQL. 

 Le lundi 11 août 2014 15:55:19 UTC+2, √ a écrit :

 Hi Jasper,

 It wasn't dumb at all.
 So are you sure that it doesn't blow up, because that would mean that 
 the sender() of GetConnection never gets his/her connection. Right?


 On Mon, Aug 11, 2014 at 3:51 PM, Jasper lme...@excilys.com wrote:

 Quick and dirty modification but indeed, that was dumb. 

 Le lundi 11 août 2014 15:47:58 UTC+2, √ a écrit :

 Unrelated but why the complication of

 sender() ! datasource.map(ds = ds.getConnection).get

 iso

 sender() ! datasource.get.getConnection


 On Mon, Aug 11, 2014 at 3:44 PM, Jasper lme...@excilys.com wrote:

 Spent some time today on this... Actually when I turn on 
 autocommit and crash the DB, processors do block but they ALL go into 
 Open 
 state, which is good. But for some reason my actor holding the 
 datasource 
 isn't responding anymore and I end up with this when asking for a 
 connection :

 akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka:
 //catalinaSys/user/connectionFactory#-1136872742]] after [1 
 ms]

 It isn't blowing up either because I don't have my hook log on 
 restart (supervisor is the guardian here). What bugs me is HikariCP 
 log :

 After cleanup pool stats HikariPool-0 (total=19, inUse=1, 
 avail=18, waiting=0)


 So there are connections available but I get timeouts instead :( 
 Here's how I do it :

 case GetConnection = sender() ! datasource.map(ds = ds.
 getConnection).get

 datasource is an Option[HikariDataSource], and if isn't there then 
 it should just blow up and restart.

  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/c
 urrent/additional/faq.html
  Search the archives: https://groups.google.com/grou
 p/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, 
 send an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.

 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  
  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/
 group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, 
 send an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  
  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from

[akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-11 Thread Jasper
Spent some time today on this... Actually when I turn on autocommit and 
crash the DB, processors do block but they ALL go into Open state, which is 
good. But for some reason my actor holding the datasource isn't responding 
anymore and I end up with this when asking for a connection :

akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://catalinaSys/user/connectionFactory#-1136872742]] 
after [1 ms]

It isn't blowing up either because I don't have my hook log on restart 
(supervisor is the guardian here). What bugs me is HikariCP log :

After cleanup pool stats HikariPool-0 (total=19, inUse=1, avail=18, 
 waiting=0)


So there are connections available but I get timeouts instead :( Here's how 
I do it :

case GetConnection = sender() ! datasource.map(ds = ds.getConnection).get

datasource is an Option[HikariDataSource], and if isn't there then it 
should just blow up and restart.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-11 Thread Jasper
Quick and dirty modification but indeed, that was dumb. 

Le lundi 11 août 2014 15:47:58 UTC+2, √ a écrit :

 Unrelated but why the complication of

 sender() ! datasource.map(ds = ds.getConnection).get

 iso

 sender() ! datasource.get.getConnection


 On Mon, Aug 11, 2014 at 3:44 PM, Jasper lme...@excilys.com javascript: 
 wrote:

 Spent some time today on this... Actually when I turn on autocommit and 
 crash the DB, processors do block but they ALL go into Open state, which is 
 good. But for some reason my actor holding the datasource isn't responding 
 anymore and I end up with this when asking for a connection :

 akka.pattern.AskTimeoutException: Ask timed out on 
 [Actor[akka://catalinaSys/user/connectionFactory#-1136872742]] 
 after [1 ms]

 It isn't blowing up either because I don't have my hook log on restart 
 (supervisor is the guardian here). What bugs me is HikariCP log :

 After cleanup pool stats HikariPool-0 (total=19, inUse=1, avail=18, 
 waiting=0)


 So there are connections available but I get timeouts instead :( Here's 
 how I do it :

 case GetConnection = sender() ! datasource.map(ds = ds.getConnection).
 get

 datasource is an Option[HikariDataSource], and if isn't there then it 
 should just blow up and restart.

  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-07 Thread Jasper
I tried it and this looks very promising as since all processors now go 
into Open state. However without the reader I'm deep into encoding hell 
because my files are in us-ascii and my db in UTF-8 :

invalid byte sequence for encoding UTF8: 0x00
And I can't just sanitize the files beforehand... Anyway I'm aware it's not 
really the place for this so unless anyone have the solution, thanks for 
your help !


Le jeudi 7 août 2014 07:27:04 UTC+2, Brett Wooldridge a écrit :

 It appears that you are using the PostgreSQL CopyManager, correct? 
  Looking at QueryExecutorImpl it appears that rollback() is trying to 
 obtain a lock that was not released by the CopyManager.  I recommend using 
 the CopyManager.copyIn() method that returns a CopyIn object, rather than 
 using the convenience method that takes a reader.  Use the writeToCopy() to 
 pump the data in, and be sure to catch SQLException.  If you get an 
 SQLException, call cancelCopy() and retry or whatever your recovery 
 scenario is, otherwise call endCopy().  I would have expected PostgreSQL 
 to handle the severing of a Connection in the middle of a bulk copy better, 
 but that is probably a question for the PostgreSQL group.

 Just my armchair diagnosis.

 On Wednesday, August 6, 2014 11:04:13 PM UTC+9, Jasper wrote:


 Sys-akka.actor.pinned-dispatcher-6 [WAITING]
 java.lang.Object.wait()Object.java:503
 org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.
 java:91
 org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, 
 ResultHandler, int, int, int)QueryExecutorImpl.java:228
 org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(
 Query)AbstractJdbc2Connection.java:808
 org.postgresql.jdbc2.AbstractJdbc2Connection.rollback()
 AbstractJdbc2Connection.java:861
 com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()
 ConnectionProxy.java:192
 com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
 java.lang.reflect.Method.invoke(Object, Object[])Method.java:606
 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14
 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
 apply$mcV$sp()FileProcessor.scala:75
 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
 apply()FileProcessor.scala:56
 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
 apply()FileProcessor.scala:56
 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
 CircuitBreaker.scala:135
 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
 CircuitBreaker.scala:135
 akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, 
 Function0)CircuitBreaker.scala:296
 akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.
 scala:345
 akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala
 :354
 akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.
 scala:113
 akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)
 CircuitBreaker.scala:135
 xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, 
 Function1)FileProcessor.scala:55
 akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor
 .scala:/spa
 ...



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-07 Thread Jasper
Alright I fixed it, it was stupid. But actually it didn't solve anything... 
Here's how I did it :

doJDBCStuff() :

val cpManager = conn.unwrap(classOf[PGConnection]).getCopyAPI
val stringBytes: Array[Byte] = batchStrings.toString().map(_.toByte).toArray
val copy = cpManager.copyIn(sCOPY tableName FROM STDIN WITH CSV)
try {
  copy.writeToCopy(stringBytes, 0, stringBytes.length)
  copy.endCopy()
} finally {
  if(copy.isActive){
copy.cancelCopy()
  }
}
conn.commit()

The potential exception should be caught higher in the first snippet I 
showed (to resend the message). By the way, I noticed that If I turn on 
autoCommit, absolutely all processors are blocking... 

Le jeudi 7 août 2014 12:02:48 UTC+2, Brett Wooldridge a écrit :

 That error indicates that one of your us-ascii files has a NUL byte 
 (0x00) in it somewhere, this is never valid in UTF-8.  You have two 
 options.  Figure out why there is a NUL character the file, or at least 
 sanitizing the bytes on-the-fly as you read chunks from the file.  Either 
 redacting the NUL bytes completely, or replacing them with, for example, a 
 space (0x20).


 On Thursday, August 7, 2014 6:10:30 PM UTC+9, Jasper wrote:

 I tried it and this looks very promising as since all processors now go 
 into Open state. However without the reader I'm deep into encoding hell 
 because my files are in us-ascii and my db in UTF-8 :

 invalid byte sequence for encoding UTF8: 0x00
 And I can't just sanitize the files beforehand... Anyway I'm aware it's 
 not really the place for this so unless anyone have the solution, thanks 
 for your help !


 Le jeudi 7 août 2014 07:27:04 UTC+2, Brett Wooldridge a écrit :

 It appears that you are using the PostgreSQL CopyManager, correct? 
  Looking at QueryExecutorImpl it appears that rollback() is trying to 
 obtain a lock that was not released by the CopyManager.  I recommend using 
 the CopyManager.copyIn() method that returns a CopyIn object, rather 
 than using the convenience method that takes a reader.  Use the 
 writeToCopy() to pump the data in, and be sure to catch SQLException. 
  If you get an SQLException, call cancelCopy() and retry or whatever 
 your recovery scenario is, otherwise call endCopy().  I would have 
 expected PostgreSQL to handle the severing of a Connection in the middle of 
 a bulk copy better, but that is probably a question for the PostgreSQL 
 group.

 Just my armchair diagnosis.

 On Wednesday, August 6, 2014 11:04:13 PM UTC+9, Jasper wrote:


 Sys-akka.actor.pinned-dispatcher-6 [WAITING]
 java.lang.Object.wait()Object.java:503
 org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.
 java:91
 org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, 
 ResultHandler, int, int, int)QueryExecutorImpl.java:228
 org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(
 Query)AbstractJdbc2Connection.java:808
 org.postgresql.jdbc2.AbstractJdbc2Connection.rollback()
 AbstractJdbc2Connection.java:861
 com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()
 ConnectionProxy.java:192
 com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
 java.lang.reflect.Method.invoke(Object, Object[])Method.java:606
 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14
 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
 apply$mcV$sp()FileProcessor.scala:75
 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
 apply()FileProcessor.scala:56
 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
 apply()FileProcessor.scala:56
 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
 CircuitBreaker.scala:135
 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
 CircuitBreaker.scala:135
 akka.pattern.CircuitBreaker$State$class.callThrough(
 CircuitBreaker$State, Function0)CircuitBreaker.scala:296
 akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)
 CircuitBreaker.scala:345
 akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.
 scala:354
 akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker
 .scala:113
 akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)
 CircuitBreaker.scala:135
 xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, 
 Function1)FileProcessor.scala:55
 akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)
 Actor.scala:/spa
 ...



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka

[akka-user] Actors behaving unexpectedly with Circuit breaker

2014-08-06 Thread Jasper
Hi,

I have a problem in my current project and I cannot pinpoint the exact 
cause. I implemented an ETL and what I have is a controller actor 
dispatching files to process to a router (SmallestMailboxPool with one file 
being one message), which dispatch them to file processors. In the end, the 
processors notifies back the controller of the outcome. My datasource 
(HikariCP) is held within an actor attached to the guardian so it restarts 
when necessary.
My controller and processors actors are attached to a *pinned dispatcher* 
because they use JDBC to insert files data into a DB. What I'm trying to do 
is handle failures of the database for the file processors. I used a 
Circuit Breaker like so :

File processors (simplified and without logging) :

override def preStart(): Unit = {
  breaker = new CircuitBreaker(context.system.scheduler,
maxFailures = 1,
callTimeout = 30 seconds, 
resetTimeout = 10 seconds)(context.dispatcher)
}

[...]

case Forwarded(file) =
  try{
breaker.withSyncCircuitBreaker {
val file = doParsingAndValidation()
file match {
  case Success(parsedFile) =
  using((connectionFactory ? GetConnection).mapTo[Connection
].waitForResult) { conn =
  conn.setAutoCommit(false)

  doJDBCStuff()

  conn.commit()
  controller ! Processed(parsedFile)
   }
  case Failure(err) =
   controller ! Malformed(file, err)
   }
}
  } catch {
case ex: Exception = context.system.scheduler.scheduleOnce(5 
seconds, self, Forwarded(file))(context.dispatcher)
  }


So basically when the database is down the breaker is gonna switch to Open 
at first exception (therefore resending message to self). After 10 seconds 
it's gonna be in HalfOpen state and allow only one file through (while all 
others will trigger the CircuitBreakerOpenException and get resent). If the 
allowed one works it's all good, otherwise the breaker is restored to Open 
state again.
Anyway, the logic works in itself : When I stop the database it loops 
through messages every 5 seconds to fail each time and resends them. *My 
real problem is that some files aren't processed !* 

I looked at my logs and ran YourKit to find out that processors process 
files normally until a crash occurs, at which point only some of them 
(seems random) switch into Open state. Eventually these ones switch to 
Closed and resume their processing but the ones that didn't go into Open 
state just keep waiting ! What I get from this is that some actors aren't 
crashing (because they never go into Open state and I don't have any logs 
for them) and are keeping files in their mailbox without doing anything. 
Here's a sample logs http://pastebin.com/HGciueNq file (here only one 
processors switched state)

So I guess my logic is flawed and I missed something ? Thanks for any help.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-06 Thread Jasper
Just noticed something interesting with YourKit... Here's an example of an 
actor behaving properly (when all files are processed) :

Sys-akka.actor.pinned-dispatcher-7 [RUNNABLE, IN_NATIVE]
java.net.SocketInputStream.read(byte[], int, int)SocketInputStream.java:122
org.postgresql.core.VisibleBufferedInputStream.readMore(int)
VisibleBufferedInputStream.java:143
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
VisibleBufferedInputStream.java:112
org.postgresql.core.VisibleBufferedInputStream.read()
VisibleBufferedInputStream.java:71
org.postgresql.core.PGStream.ReceiveChar()PGStream.java:269
org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(
CopyOperationImpl, boolean)QueryExecutorImpl.java:930
org.postgresql.core.v3.QueryExecutorImpl.endCopy(CopyInImpl)
QueryExecutorImpl.java:828
org.postgresql.core.v3.CopyInImpl.endCopy()CopyInImpl.java:59
org.postgresql.copy.CopyManager.copyIn(String, Reader, int)CopyManager.java:
145
org.postgresql.copy.CopyManager.copyIn(String, Reader)CopyManager.java:124
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8
.apply(Seq)FileProcessor.scala:98
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8
.apply(Object)FileProcessor.scala:84
scala.collection.Iterator$class.foreach(Iterator, Function1)Iterator.scala:
743
scala.collection.AbstractIterator.foreach(Function1)Iterator.scala:1174
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7
.apply(PushbackReader)FileProcessor.scala:84
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7
.apply(Object)FileProcessor.scala:82
xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1
.apply(Connection)FileProcessor.scala:82
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1
.apply(Object)FileProcessor.scala:75
xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
apply$mcV$sp()FileProcessor.scala:75
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
CircuitBreaker.scala:135
akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
CircuitBreaker.scala:135
akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, 
Function0)CircuitBreaker.scala:296
akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.
scala:345
akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala:
354
akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.
scala:113
akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker.
scala:135
xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, 
Function1)FileProcessor.scala:55
akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor.
scala:465
xx..actors.FileProcessor.aroundReceive(PartialFunction, Object)
FileProcessor.scala:27
akka.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516
akka.actor.ActorCell.invoke(Envelope)ActorCell.scala:487
akka.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238
akka.dispatch.Mailbox.run()Mailbox.scala:220
java.lang.Thread.run()Thread.java:744

Now one who does not : 

Sys-akka.actor.pinned-dispatcher-6 [WAITING]
java.lang.Object.wait()Object.java:503
org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.java:
91
org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, 
ResultHandler, int, int, int)QueryExecutorImpl.java:228
org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query
)AbstractJdbc2Connection.java:808
org.postgresql.jdbc2.AbstractJdbc2Connection.rollback()
AbstractJdbc2Connection.java:861
com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()
ConnectionProxy.java:192
com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
java.lang.reflect.Method.invoke(Object, Object[])Method.java:606
xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
apply$mcV$sp()FileProcessor.scala:75
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
CircuitBreaker.scala:135

[akka-user] Re: Actors behaving unexpectedly with Circuit breaker

2014-08-06 Thread Jasper
Just noticed something interesting with YourKit... Here's an example of an 
actor behaving properly (when all files are processed) :

Sys-akka.actor.pinned-dispatcher-7 [RUNNABLE, IN_NATIVE]
java.net.SocketInputStream.read(byte[], int, int)SocketInputStream.java:122
org.postgresql.core.VisibleBufferedInputStream.readMore(int)
VisibleBufferedInputStream.java:143
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
VisibleBufferedInputStream.java:112
org.postgresql.core.VisibleBufferedInputStream.read()
VisibleBufferedInputStream.java:71
org.postgresql.core.PGStream.ReceiveChar()PGStream.java:269
org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(
CopyOperationImpl, boolean)QueryExecutorImpl.java:930
org.postgresql.core.v3.QueryExecutorImpl.endCopy(CopyInImpl)
QueryExecutorImpl.java:828
org.postgresql.core.v3.CopyInImpl.endCopy()CopyInImpl.java:59
org.postgresql.copy.CopyManager.copyIn(String, Reader, int)CopyManager.java:
145
org.postgresql.copy.CopyManager.copyIn(String, Reader)CopyManager.java:124
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8
.apply(Seq)FileProcessor.scala:98
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8
.apply(Object)FileProcessor.scala:84
scala.collection.Iterator$class.foreach(Iterator, Function1)Iterator.scala:
743
scala.collection.AbstractIterator.foreach(Function1)Iterator.scala:1174
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7
.apply(PushbackReader)FileProcessor.scala:84
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7
.apply(Object)FileProcessor.scala:82
xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1
.apply(Connection)FileProcessor.scala:82
xx..actors.
FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1
.apply(Object)FileProcessor.scala:75
xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
apply$mcV$sp()FileProcessor.scala:75
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
CircuitBreaker.scala:135
akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
CircuitBreaker.scala:135
akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, 
Function0)CircuitBreaker.scala:296
akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.
scala:345
akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala:
354
akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.
scala:113
akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker.
scala:135
xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, 
Function1)FileProcessor.scala:55
akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor.
scala:465
xx..actors.FileProcessor.aroundReceive(PartialFunction, Object)
FileProcessor.scala:27
akka.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516
akka.actor.ActorCell.invoke(Envelope)ActorCell.scala:487
akka.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238
akka.dispatch.Mailbox.run()Mailbox.scala:220
java.lang.Thread.run()Thread.java:744

Now one who does not : 

Sys-akka.actor.pinned-dispatcher-6 [WAITING]
java.lang.Object.wait()Object.java:503
org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.java:
91
org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, 
ResultHandler, int, int, int)QueryExecutorImpl.java:228
org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query
)AbstractJdbc2Connection.java:808
org.postgresql.jdbc2.AbstractJdbc2Connection.rollback()
AbstractJdbc2Connection.java:861
com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()
ConnectionProxy.java:192
com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
java.lang.reflect.Method.invoke(Object, Object[])Method.java:606
xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
apply$mcV$sp()FileProcessor.scala:75
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply
()FileProcessor.scala:56
akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
CircuitBreaker.scala:135

[akka-user] Circuit Breaker and database failures issue

2014-08-04 Thread Jasper
Hi,

I'm currently trying to implement some kind of supervision in my app. Some 
context first : I have a controller actor dispatching files to parse when 
processor actor is done with his previous file. Of course I need to 
bootstrap the processing chain when application starts. My file processors 
are doing JDBC calls to insert the files content through a connection 
acquired from a datasource holder actor (using HikariCP with 
initializationFailFast enabled). Note my processors are currently *eight* 
and they are, as well as the controller, attached to a pinned dispatcher. 

What I need to do is ensuring that when the DB is down (or anything bad 
happens while we're at it), all messages are buffered/kept until it's up 
again to resume processing.

I tried to implement something with Circuit breaker and PeekMailbox (Not 
stashing because I don't see the benefit, but already tried with it and 
similar errors) but for some reason my system is blocked at the HalfOpen 
state. This is probably because of my logic but I can't figure out why. 
Here are some simplified snippets :

Controller actor

case Processed(pfile) =
  logger.debug(sController received good processing of file 
[${pfile.fileName}])
  fileHandler ! Processed(pfile)
  processFiles(1)

case Unprocessed = processFiles(1) //sends a file to processors 
through a router


File processor actor(s)


val breaker =
CircuitBreaker(context.system.scheduler,
  maxFailures = 1,
  callTimeout = 30 seconds,
  resetTimeout = 10 seconds)

  breaker.onOpen {
logger.info(Switching to Open state)
  }.onHalfOpen {
logger.info(Switching to HalfOpen state)
  }.onClose {
logger.info(Switching to Closed state)
  }

case Forwarded(file) =
  logger.debug(sFileParser${hashCode()} detected file 
[${file.getName}])
  try{
breaker.withSyncCircuitBreaker {
  val parsedFile = doParsingAndValidation()
  parsedFile match {
case Success(parsedFile) =
  using((connectionFactory ? GetConnection).mapTo[Connection
].waitForResult) { conn =
conn.setAutoCommit(false)

doJdbcStuff(conn)

controller ! Processed(parsedFile)
PeekMailboxExtension.ack()
  }

   case Failure(err) = controller ! Malformed(file, err)
  }

}
  } catch {
case ex: Exception =
  controller ! Unprocessed
  logger.error(sCouldn't process file [${file.getName}]. Will try 
again later.)
  }


*Here is a link to the logs http://pastebin.com/dQavVcr3*. You can see 
It's stuck in Half Open state because for some reason it's not receiving 
messages anymore. I've also got some weird stuff going on in my logs like :

16:43:06.161 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR 
fr.catalina.actors.FileProcessor - Couldn't process file [MS7347RP.114]. 
Will try again later.
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG 
fr.catalina.actors.FileProcessor - FileParser1834490077 detected file 
[MS0134RP.114]
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR 
fr.catalina.actors.FileProcessor - Couldn't process file [MS0134RP.114]. 
Will try again later.
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG 
fr.catalina.actors.FileProcessor - FileParser1834490077 detected file 
[MS0134RP.114]
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR 
fr.catalina.actors.FileProcessor - Couldn't process file [MS0134RP.114]. 
Will try again later.

Why twice ?

Anyway, I'd appreciate any tips or advices pointing me in the right 
direction. Thanks !






-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Circuit breaker and database failures issue

2014-08-04 Thread Jasper
Hi,

I'm currently trying to implement some kind of supervision in my app. Some 
context first : I have a controller actor dispatching files to parse when 
processor actor is done with his previous file. Of course I need to 
bootstrap the processing chain when application starts. My file processors 
are doing JDBC calls to insert the files content through a connection 
acquired from a datasource holder actor (using HikariCP with 
initializationFailFast enabled). Note my processors are currently eight and 
they are, as well as the controller, attached to a pinned dispatcher.

What I need to do is ensuring that when the DB is down (or anything bad 
happens while we're at it), all messages are buffered/kept until it's up 
again to resume processing.

I tried to implement something with Circuit breaker and PeekMailbox (Not 
stashing because I don't see the benefit, but already tried with it and 
similar errors) but for some reason my system is blocked at the HalfOpen 
state. This is probably because of my logic but I can't figure out why. 
Here are some simplified snippets :

Controller actor

case Processed(pfile) =
  logger.debug(sController received good processing of file 
[${pfile.fileName}])
  fileHandler ! Processed(pfile)
  processFiles(1)

case Unprocessed = processFiles(1) //sends a file to processors 
through a router




File processor actor(s)


val breaker =
CircuitBreaker(context.system.scheduler,
  maxFailures = 1,
  callTimeout = 30 seconds,
  resetTimeout = 10 seconds)

  breaker.onOpen {
logger.info(Switching to Open state)
  }.onHalfOpen {
logger.info(Switching to HalfOpen state)
  }.onClose {
logger.info(Switching to Closed state)
  }

case Forwarded(file) =
  logger.debug(sFileParser${hashCode()} detected file 
[${file.getName}])
  try{
breaker.withSyncCircuitBreaker {
  val parsedFile = doParsingAndValidation()
  parsedFile match {
case Success(parsedFile) =
  using((connectionFactory ? GetConnection).mapTo[Connection
].waitForResult) { conn =
conn.setAutoCommit(false)

doJdbcStuff(conn)
   
controller ! Processed(parsedFile)
PeekMailboxExtension.ack()
  }

   case Failure(err) = controller ! Malformed(file, err)
  }

}
  } catch {
case ex: Exception =
  controller ! Unprocessed
  logger.error(sCouldn't process file [${file.getName}]. Will try 
again later.)
  }




Here is a link to the logs http://pastebin.com/6kne5aMp (will die in 1 
week). You can see It's stuck in Half Open state because for some reason 
it's not receiving messages anymore. I've also got some weird stuff going 
on in my logs like :

16:43:06.161 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR 
fr.catalina.actors.FileProcessor - Couldn't process file [file1]. Will try 
again later.
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG 
fr.catalina.actors.FileProcessor - FileParser1834490077 detected file 
[file1]
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR 
fr.catalina.actors.FileProcessor - Couldn't process file [file1]. Will try 
again later.
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG 
fr.catalina.actors.FileProcessor - FileParser1834490077 detected file 
[file1]
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR 
fr.catalina.actors.FileProcessor - Couldn't process file [file1]. Will try 
again later.

Why twice ?

Anyway, I'd appreciate any tips or advices pointing me in the right 
direction. Thanks !

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Sending a message after Future(s) completion

2014-06-27 Thread Jasper
Hello,

Beginner here, I'm experiencing some issues with a particular portion of my 
code :

  val paths = ListBuffer.empty[List[File]]
  futureFiles.foreach{
_.onComplete{
  case Success(list: List[File]) =
paths += list
  case Failure(ex) =
log.info(Retrieval of files failed : {}, 
ex.getLocalizedMessage)
}
  }
  sender() ! Detected(paths.flatten.toIndexedSeq)

futureFiles is a List[Future[List[File]]]. So I attach a callback on each 
of them and construct a new list from it. My problem is because this is 
asynchronous code, sometimes it will make the tell before the paths list is 
filled up, thus sending nothing. I figured I'd try something like this :

  Future.sequence(futureFiles).onComplete{
  case Success(lists) =
sender() ! Detected(lists.flatten.toIndexedSeq)
  case Failure(ex) =
log.info(Retrieval of files failed : {}, 
ex.getLocalizedMessage)
  }

But doing so I lose some failure messages because of the sequencing, and it 
doesn't work anyway because the message is going straight to dead letters. 
So how can I achieve this ?

Thanks

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Sending a message after Future(s) completion

2014-06-27 Thread Jasper
Missed to wrap the Nil in a Future, but it works perfect, thanks !

Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit :

 WARNING: Did not try to compile this

 Future.sequence(
   futureFiles map { _ recoverWith { case ex = log.info(Retrieval of 
 files failed : {}, ex.getLocalizedMessage); Nil } }
  ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender


 On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com javascript:
  wrote:

 Hello,

 Beginner here, I'm experiencing some issues with a particular portion of 
 my code :

   val paths = ListBuffer.empty[List[File]]
   futureFiles.foreach{
 _.onComplete{
   case Success(list: List[File]) =
 paths += list
   case Failure(ex) =
 log.info(Retrieval of files failed : {}, 
 ex.getLocalizedMessage)
 }
   }
   sender() ! Detected(paths.flatten.toIndexedSeq)

 futureFiles is a List[Future[List[File]]]. So I attach a callback on each 
 of them and construct a new list from it. My problem is because this is 
 asynchronous code, sometimes it will make the tell before the paths list is 
 filled up, thus sending nothing. I figured I'd try something like this :

   Future.sequence(futureFiles).onComplete{
   case Success(lists) =
 sender() ! Detected(lists.flatten.toIndexedSeq)
   case Failure(ex) =
 log.info(Retrieval of files failed : {}, 
 ex.getLocalizedMessage)
   }

 But doing so I lose some failure messages because of the sequencing, and 
 it doesn't work anyway because the message is going straight to dead 
 letters. So how can I achieve this ?

 Thanks
  
 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Sending a message after Future(s) completion

2014-06-27 Thread Jasper
Oh I see, it looks better indeed. Thanks.

Le vendredi 27 juin 2014 12:15:38 UTC+2, √ a écrit :

 Oh, it's even better, use recover instead of recoverWith.


 On Fri, Jun 27, 2014 at 12:14 PM, Jasper lme...@excilys.com javascript:
  wrote:

 Missed to wrap the Nil in a Future, but it works perfect, thanks !

 Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit :

 WARNING: Did not try to compile this

 Future.sequence(
   futureFiles map { _ recoverWith { case ex = log.info(Retrieval of 
 files failed : {}, ex.getLocalizedMessage); Nil } }
  ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender


 On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com wrote:

 Hello,

 Beginner here, I'm experiencing some issues with a particular portion 
 of my code :

   val paths = ListBuffer.empty[List[File]]
   futureFiles.foreach{
 _.onComplete{
   case Success(list: List[File]) =
 paths += list
   case Failure(ex) =
 log.info(Retrieval of files failed : {}, 
 ex.getLocalizedMessage)
 }
   }
   sender() ! Detected(paths.flatten.toIndexedSeq)

 futureFiles is a List[Future[List[File]]]. So I attach a callback on 
 each of them and construct a new list from it. My problem is because this 
 is asynchronous code, sometimes it will make the tell before the paths 
 list 
 is filled up, thus sending nothing. I figured I'd try something like this :

   Future.sequence(futureFiles).onComplete{
   case Success(lists) =
 sender() ! Detected(lists.flatten.toIndexedSeq)
   case Failure(ex) =
 log.info(Retrieval of files failed : {}, 
 ex.getLocalizedMessage)
   }

 But doing so I lose some failure messages because of the sequencing, 
 and it doesn't work anyway because the message is going straight to dead 
 letters. So how can I achieve this ?

 Thanks
  
 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/
 group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.

 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  
  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.