[akka-user] Re: Fault tolerance on unhandled events in an FSM
Update: I got it to work as expected (expected by me, at least): whenUnhandled { case event@Event(message, _) = unhandled(message) stop(Failure(sUnhandled message $message in state $stateName)) } and the test: it should not handle X in state Y in { system.eventStream.subscribe(testActor, classOf[UnhandledMessage]) actor.setState(Y, Z) actor ! X expectMsgType[UnhandledMessage](1 seconds) } In my case the *stop()* is a bit redundant, since the supervisor will restart the actor anyway on an *UnhandledMessage*. But hey, a *stay()* is equally weird. It would be nice if the default behavior of FSM would be *unhandled(message); stay()*. That seems to me to integrate better with actor supervisor strategies. My 2 cents. =) Yours, Jasper On Sunday, August 30, 2015 at 12:46:33 AM UTC+2, Jasper Visser wrote: Hi, I have a parent actor and some child actors; most are subclasses of *akka.actor.FSM*, some are simply *akka.actor.Actor*. The parent actor has a *AllForOneStrategy* to restart all the child actors on any unhandled exception. This seems to work as advertised. I would like to have it so that when a child actor receives an *unhandled event*, this will also trigger the supervisor strategy. Basically, any unhandled event would result in a restart of all the children. It's probably a sledgehammer approach, but I can always finetune it later. Unfortunately (for what I have in mind, at least) the default behavior of FSM on unhandled events is to *log a warning *and* stay()*. On the other hand, the default behavior of Actor is to publish an *UnhandledMessage* or throw *DeathPactException*. Is there a simple way to get FSM to behave the same way as Actor? I tried this approach: whenUnhandled { case event@Event(_, _) = log.error(sUnhandled event $event) unhandled(event) stay() } But that doesn't seem to do anything. The parent never receives the *UnhandledMessage* message. I can trigger the supervisor strategy by simply throwing an exception: whenUnhandled { case event@Event(_, _) = log.error(sUnhandled event $event) throw new RuntimeException(sUnhandled event $event) } But in that scenario I have no idea how to unit test the case of an unhandled event. For that matter I can find very few examples of testing unhandled messages/events. Closest thing I've found is: http://doc.akka.io/docs/akka/2.0.5/scala/testing.html import akka.testkit.TestActorRef system.eventStream.subscribe(testActor, classOf[UnhandledMessage]) val ref = TestActorRef[MyActor] ref.receive(Unknown) expectMsg(1 second, UnhandledMessage(Unknown, system.deadLetters, ref)) So basically my questions are: 1. What is a good way to use a supervisor strategy in a system that includes FSM actors? 2. What is a good way to unit test unhandled events on FSM actors? Yours, Jasper -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Fault tolerance on unhandled events in an FSM
Hi, I have a parent actor and some child actors; most are subclasses of *akka.actor.FSM*, some are simply *akka.actor.Actor*. The parent actor has a *AllForOneStrategy* to restart all the child actors on any unhandled exception. This seems to work as advertised. I would like to have it so that when a child actor receives an *unhandled event*, this will also trigger the supervisor strategy. Basically, any unhandled event would result in a restart of all the children. It's probably a sledgehammer approach, but I can always finetune it later. Unfortunately (for what I have in mind, at least) the default behavior of FSM on unhandled events is to *log a warning *and* stay()*. On the other hand, the default behavior of Actor is to publish an *UnhandledMessage* or throw *DeathPactException*. Is there a simple way to get FSM to behave the same way as Actor? I tried this approach: whenUnhandled { case event@Event(_, _) = log.error(sUnhandled event $event) unhandled(event) stay() } But that doesn't seem to do anything. The parent never receives the *UnhandledMessage* message. I can trigger the supervisor strategy by simply throwing an exception: whenUnhandled { case event@Event(_, _) = log.error(sUnhandled event $event) throw new RuntimeException(sUnhandled event $event) } But in that scenario I have no idea how to unit test the case of an unhandled event. For that matter I can find very few examples of testing unhandled messages/events. Closest thing I've found is: http://doc.akka.io/docs/akka/2.0.5/scala/testing.html import akka.testkit.TestActorRef system.eventStream.subscribe(testActor, classOf[UnhandledMessage]) val ref = TestActorRef[MyActor] ref.receive(Unknown) expectMsg(1 second, UnhandledMessage(Unknown, system.deadLetters, ref)) So basically my questions are: 1. What is a good way to use a supervisor strategy in a system that includes FSM actors? 2. What is a good way to unit test unhandled events on FSM actors? Yours, Jasper -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Actors behaving unexpectedly with Circuit breaker
Nope, doesn't change anything. Le lundi 11 août 2014 18:41:05 UTC+2, Patrik Nordwall a écrit : Does it change anything if use another (default) dispatcher for the circuit breaker and the futures? /Patrik 11 aug 2014 kl. 17:56 skrev Leonard Meyer lme...@excilys.com javascript:: Anything blocking I put on a pinned dispatcher. So each FileProcessor and the Controller (which sometimes does JDBC). Each FileProcessor also has a CircuitBreaker which uses the same pinned dispatcher as said FileProcessor. Is there something wrong with this ? 2014-08-11 17:34 GMT+02:00 Patrik Nordwall patrik@gmail.com javascript:: What dispatcher do you use for what? You mentioned pinned dispatcher, but you also have context.dispatcher for the circuit breaker. Pinned dispatcher there is probably not a good idea since the actor is blocking (the thread is busy). You also have futures, e.g. mapTo. /Patrik On Mon, Aug 11, 2014 at 4:42 PM, Leonard Meyer lme...@excilys.com javascript: wrote: Yes, all I have is at the beginning where it's basically saying who's supervising who. And I have : [DEBUG] [08/11/2014 16:36:41.977] [akka.actor.default-dispatcher-2] [akka:///user] now supervising Actor[akka://x/user/connectionFactory#608280561] connectionFactory is the name of my datasource actor. 2014-08-11 16:26 GMT+02:00 √iktor Ҡlang viktor...@gmail.com javascript:: Did you enable lifecycle logging? On Mon, Aug 11, 2014 at 4:22 PM, Jasper lme...@excilys.com javascript: wrote: Well, given what I said before, pretty sure yes. And yes the sender() doesn't receive anything so it times out and the circuit breaker keep on doing the Open/HalfOpen circle. Here are some sample logs http://pastebin.com/TUPVMNNi This is annoying, I don't know if I'm doing something wrong with Akka, Hikari or PostgreSQL. Le lundi 11 août 2014 15:55:19 UTC+2, √ a écrit : Hi Jasper, It wasn't dumb at all. So are you sure that it doesn't blow up, because that would mean that the sender() of GetConnection never gets his/her connection. Right? On Mon, Aug 11, 2014 at 3:51 PM, Jasper lme...@excilys.com wrote: Quick and dirty modification but indeed, that was dumb. Le lundi 11 août 2014 15:47:58 UTC+2, √ a écrit : Unrelated but why the complication of sender() ! datasource.map(ds = ds.getConnection).get iso sender() ! datasource.get.getConnection On Mon, Aug 11, 2014 at 3:44 PM, Jasper lme...@excilys.com wrote: Spent some time today on this... Actually when I turn on autocommit and crash the DB, processors do block but they ALL go into Open state, which is good. But for some reason my actor holding the datasource isn't responding anymore and I end up with this when asking for a connection : akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka: //catalinaSys/user/connectionFactory#-1136872742]] after [1 ms] It isn't blowing up either because I don't have my hook log on restart (supervisor is the guardian here). What bugs me is HikariCP log : After cleanup pool stats HikariPool-0 (total=19, inUse=1, avail=18, waiting=0) So there are connections available but I get timeouts instead :( Here's how I do it : case GetConnection = sender() ! datasource.map(ds = ds. getConnection).get datasource is an Option[HikariDataSource], and if isn't there then it should just blow up and restart. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c urrent/additional/faq.html Search the archives: https://groups.google.com/grou p/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from
[akka-user] Re: Actors behaving unexpectedly with Circuit breaker
Spent some time today on this... Actually when I turn on autocommit and crash the DB, processors do block but they ALL go into Open state, which is good. But for some reason my actor holding the datasource isn't responding anymore and I end up with this when asking for a connection : akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://catalinaSys/user/connectionFactory#-1136872742]] after [1 ms] It isn't blowing up either because I don't have my hook log on restart (supervisor is the guardian here). What bugs me is HikariCP log : After cleanup pool stats HikariPool-0 (total=19, inUse=1, avail=18, waiting=0) So there are connections available but I get timeouts instead :( Here's how I do it : case GetConnection = sender() ! datasource.map(ds = ds.getConnection).get datasource is an Option[HikariDataSource], and if isn't there then it should just blow up and restart. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Actors behaving unexpectedly with Circuit breaker
Quick and dirty modification but indeed, that was dumb. Le lundi 11 août 2014 15:47:58 UTC+2, √ a écrit : Unrelated but why the complication of sender() ! datasource.map(ds = ds.getConnection).get iso sender() ! datasource.get.getConnection On Mon, Aug 11, 2014 at 3:44 PM, Jasper lme...@excilys.com javascript: wrote: Spent some time today on this... Actually when I turn on autocommit and crash the DB, processors do block but they ALL go into Open state, which is good. But for some reason my actor holding the datasource isn't responding anymore and I end up with this when asking for a connection : akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://catalinaSys/user/connectionFactory#-1136872742]] after [1 ms] It isn't blowing up either because I don't have my hook log on restart (supervisor is the guardian here). What bugs me is HikariCP log : After cleanup pool stats HikariPool-0 (total=19, inUse=1, avail=18, waiting=0) So there are connections available but I get timeouts instead :( Here's how I do it : case GetConnection = sender() ! datasource.map(ds = ds.getConnection). get datasource is an Option[HikariDataSource], and if isn't there then it should just blow up and restart. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Actors behaving unexpectedly with Circuit breaker
I tried it and this looks very promising as since all processors now go into Open state. However without the reader I'm deep into encoding hell because my files are in us-ascii and my db in UTF-8 : invalid byte sequence for encoding UTF8: 0x00 And I can't just sanitize the files beforehand... Anyway I'm aware it's not really the place for this so unless anyone have the solution, thanks for your help ! Le jeudi 7 août 2014 07:27:04 UTC+2, Brett Wooldridge a écrit : It appears that you are using the PostgreSQL CopyManager, correct? Looking at QueryExecutorImpl it appears that rollback() is trying to obtain a lock that was not released by the CopyManager. I recommend using the CopyManager.copyIn() method that returns a CopyIn object, rather than using the convenience method that takes a reader. Use the writeToCopy() to pump the data in, and be sure to catch SQLException. If you get an SQLException, call cancelCopy() and retry or whatever your recovery scenario is, otherwise call endCopy(). I would have expected PostgreSQL to handle the severing of a Connection in the middle of a bulk copy better, but that is probably a question for the PostgreSQL group. Just my armchair diagnosis. On Wednesday, August 6, 2014 11:04:13 PM UTC+9, Jasper wrote: Sys-akka.actor.pinned-dispatcher-6 [WAITING] java.lang.Object.wait()Object.java:503 org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl. java:91 org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int)QueryExecutorImpl.java:228 org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand( Query)AbstractJdbc2Connection.java:808 org.postgresql.jdbc2.AbstractJdbc2Connection.rollback() AbstractJdbc2Connection.java:861 com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState() ConnectionProxy.java:192 com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305 java.lang.reflect.Method.invoke(Object, Object[])Method.java:606 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply$mcV$sp()FileProcessor.scala:75 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply()FileProcessor.scala:56 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply()FileProcessor.scala:56 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, Function0)CircuitBreaker.scala:296 akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker. scala:345 akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala :354 akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker. scala:113 akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0) CircuitBreaker.scala:135 xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55 akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor .scala:/spa ... -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Actors behaving unexpectedly with Circuit breaker
Alright I fixed it, it was stupid. But actually it didn't solve anything... Here's how I did it : doJDBCStuff() : val cpManager = conn.unwrap(classOf[PGConnection]).getCopyAPI val stringBytes: Array[Byte] = batchStrings.toString().map(_.toByte).toArray val copy = cpManager.copyIn(sCOPY tableName FROM STDIN WITH CSV) try { copy.writeToCopy(stringBytes, 0, stringBytes.length) copy.endCopy() } finally { if(copy.isActive){ copy.cancelCopy() } } conn.commit() The potential exception should be caught higher in the first snippet I showed (to resend the message). By the way, I noticed that If I turn on autoCommit, absolutely all processors are blocking... Le jeudi 7 août 2014 12:02:48 UTC+2, Brett Wooldridge a écrit : That error indicates that one of your us-ascii files has a NUL byte (0x00) in it somewhere, this is never valid in UTF-8. You have two options. Figure out why there is a NUL character the file, or at least sanitizing the bytes on-the-fly as you read chunks from the file. Either redacting the NUL bytes completely, or replacing them with, for example, a space (0x20). On Thursday, August 7, 2014 6:10:30 PM UTC+9, Jasper wrote: I tried it and this looks very promising as since all processors now go into Open state. However without the reader I'm deep into encoding hell because my files are in us-ascii and my db in UTF-8 : invalid byte sequence for encoding UTF8: 0x00 And I can't just sanitize the files beforehand... Anyway I'm aware it's not really the place for this so unless anyone have the solution, thanks for your help ! Le jeudi 7 août 2014 07:27:04 UTC+2, Brett Wooldridge a écrit : It appears that you are using the PostgreSQL CopyManager, correct? Looking at QueryExecutorImpl it appears that rollback() is trying to obtain a lock that was not released by the CopyManager. I recommend using the CopyManager.copyIn() method that returns a CopyIn object, rather than using the convenience method that takes a reader. Use the writeToCopy() to pump the data in, and be sure to catch SQLException. If you get an SQLException, call cancelCopy() and retry or whatever your recovery scenario is, otherwise call endCopy(). I would have expected PostgreSQL to handle the severing of a Connection in the middle of a bulk copy better, but that is probably a question for the PostgreSQL group. Just my armchair diagnosis. On Wednesday, August 6, 2014 11:04:13 PM UTC+9, Jasper wrote: Sys-akka.actor.pinned-dispatcher-6 [WAITING] java.lang.Object.wait()Object.java:503 org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl. java:91 org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int)QueryExecutorImpl.java:228 org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand( Query)AbstractJdbc2Connection.java:808 org.postgresql.jdbc2.AbstractJdbc2Connection.rollback() AbstractJdbc2Connection.java:861 com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState() ConnectionProxy.java:192 com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305 java.lang.reflect.Method.invoke(Object, Object[])Method.java:606 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply$mcV$sp()FileProcessor.scala:75 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply()FileProcessor.scala:56 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply()FileProcessor.scala:56 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$State$class.callThrough( CircuitBreaker$State, Function0)CircuitBreaker.scala:296 akka.pattern.CircuitBreaker$Closed$.callThrough(Function0) CircuitBreaker.scala:345 akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker. scala:354 akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker .scala:113 akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0) CircuitBreaker.scala:135 xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55 akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object) Actor.scala:/spa ... -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka
[akka-user] Actors behaving unexpectedly with Circuit breaker
Hi, I have a problem in my current project and I cannot pinpoint the exact cause. I implemented an ETL and what I have is a controller actor dispatching files to process to a router (SmallestMailboxPool with one file being one message), which dispatch them to file processors. In the end, the processors notifies back the controller of the outcome. My datasource (HikariCP) is held within an actor attached to the guardian so it restarts when necessary. My controller and processors actors are attached to a *pinned dispatcher* because they use JDBC to insert files data into a DB. What I'm trying to do is handle failures of the database for the file processors. I used a Circuit Breaker like so : File processors (simplified and without logging) : override def preStart(): Unit = { breaker = new CircuitBreaker(context.system.scheduler, maxFailures = 1, callTimeout = 30 seconds, resetTimeout = 10 seconds)(context.dispatcher) } [...] case Forwarded(file) = try{ breaker.withSyncCircuitBreaker { val file = doParsingAndValidation() file match { case Success(parsedFile) = using((connectionFactory ? GetConnection).mapTo[Connection ].waitForResult) { conn = conn.setAutoCommit(false) doJDBCStuff() conn.commit() controller ! Processed(parsedFile) } case Failure(err) = controller ! Malformed(file, err) } } } catch { case ex: Exception = context.system.scheduler.scheduleOnce(5 seconds, self, Forwarded(file))(context.dispatcher) } So basically when the database is down the breaker is gonna switch to Open at first exception (therefore resending message to self). After 10 seconds it's gonna be in HalfOpen state and allow only one file through (while all others will trigger the CircuitBreakerOpenException and get resent). If the allowed one works it's all good, otherwise the breaker is restored to Open state again. Anyway, the logic works in itself : When I stop the database it loops through messages every 5 seconds to fail each time and resends them. *My real problem is that some files aren't processed !* I looked at my logs and ran YourKit to find out that processors process files normally until a crash occurs, at which point only some of them (seems random) switch into Open state. Eventually these ones switch to Closed and resume their processing but the ones that didn't go into Open state just keep waiting ! What I get from this is that some actors aren't crashing (because they never go into Open state and I don't have any logs for them) and are keeping files in their mailbox without doing anything. Here's a sample logs http://pastebin.com/HGciueNq file (here only one processors switched state) So I guess my logic is flawed and I missed something ? Thanks for any help. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Actors behaving unexpectedly with Circuit breaker
Just noticed something interesting with YourKit... Here's an example of an actor behaving properly (when all files are processed) : Sys-akka.actor.pinned-dispatcher-7 [RUNNABLE, IN_NATIVE] java.net.SocketInputStream.read(byte[], int, int)SocketInputStream.java:122 org.postgresql.core.VisibleBufferedInputStream.readMore(int) VisibleBufferedInputStream.java:143 org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int) VisibleBufferedInputStream.java:112 org.postgresql.core.VisibleBufferedInputStream.read() VisibleBufferedInputStream.java:71 org.postgresql.core.PGStream.ReceiveChar()PGStream.java:269 org.postgresql.core.v3.QueryExecutorImpl.processCopyResults( CopyOperationImpl, boolean)QueryExecutorImpl.java:930 org.postgresql.core.v3.QueryExecutorImpl.endCopy(CopyInImpl) QueryExecutorImpl.java:828 org.postgresql.core.v3.CopyInImpl.endCopy()CopyInImpl.java:59 org.postgresql.copy.CopyManager.copyIn(String, Reader, int)CopyManager.java: 145 org.postgresql.copy.CopyManager.copyIn(String, Reader)CopyManager.java:124 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8 .apply(Seq)FileProcessor.scala:98 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8 .apply(Object)FileProcessor.scala:84 scala.collection.Iterator$class.foreach(Iterator, Function1)Iterator.scala: 743 scala.collection.AbstractIterator.foreach(Function1)Iterator.scala:1174 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7 .apply(PushbackReader)FileProcessor.scala:84 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7 .apply(Object)FileProcessor.scala:82 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1 .apply(Connection)FileProcessor.scala:82 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1 .apply(Object)FileProcessor.scala:75 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply$mcV$sp()FileProcessor.scala:75 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, Function0)CircuitBreaker.scala:296 akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker. scala:345 akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala: 354 akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker. scala:113 akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker. scala:135 xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55 akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor. scala:465 xx..actors.FileProcessor.aroundReceive(PartialFunction, Object) FileProcessor.scala:27 akka.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516 akka.actor.ActorCell.invoke(Envelope)ActorCell.scala:487 akka.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238 akka.dispatch.Mailbox.run()Mailbox.scala:220 java.lang.Thread.run()Thread.java:744 Now one who does not : Sys-akka.actor.pinned-dispatcher-6 [WAITING] java.lang.Object.wait()Object.java:503 org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.java: 91 org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int)QueryExecutorImpl.java:228 org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query )AbstractJdbc2Connection.java:808 org.postgresql.jdbc2.AbstractJdbc2Connection.rollback() AbstractJdbc2Connection.java:861 com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState() ConnectionProxy.java:192 com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305 java.lang.reflect.Method.invoke(Object, Object[])Method.java:606 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply$mcV$sp()FileProcessor.scala:75 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135
[akka-user] Re: Actors behaving unexpectedly with Circuit breaker
Just noticed something interesting with YourKit... Here's an example of an actor behaving properly (when all files are processed) : Sys-akka.actor.pinned-dispatcher-7 [RUNNABLE, IN_NATIVE] java.net.SocketInputStream.read(byte[], int, int)SocketInputStream.java:122 org.postgresql.core.VisibleBufferedInputStream.readMore(int) VisibleBufferedInputStream.java:143 org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int) VisibleBufferedInputStream.java:112 org.postgresql.core.VisibleBufferedInputStream.read() VisibleBufferedInputStream.java:71 org.postgresql.core.PGStream.ReceiveChar()PGStream.java:269 org.postgresql.core.v3.QueryExecutorImpl.processCopyResults( CopyOperationImpl, boolean)QueryExecutorImpl.java:930 org.postgresql.core.v3.QueryExecutorImpl.endCopy(CopyInImpl) QueryExecutorImpl.java:828 org.postgresql.core.v3.CopyInImpl.endCopy()CopyInImpl.java:59 org.postgresql.copy.CopyManager.copyIn(String, Reader, int)CopyManager.java: 145 org.postgresql.copy.CopyManager.copyIn(String, Reader)CopyManager.java:124 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8 .apply(Seq)FileProcessor.scala:98 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8 .apply(Object)FileProcessor.scala:84 scala.collection.Iterator$class.foreach(Iterator, Function1)Iterator.scala: 743 scala.collection.AbstractIterator.foreach(Function1)Iterator.scala:1174 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7 .apply(PushbackReader)FileProcessor.scala:84 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7 .apply(Object)FileProcessor.scala:82 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1 .apply(Connection)FileProcessor.scala:82 xx..actors. FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1 .apply(Object)FileProcessor.scala:75 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:12 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply$mcV$sp()FileProcessor.scala:75 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135 akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, Function0)CircuitBreaker.scala:296 akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker. scala:345 akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala: 354 akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker. scala:113 akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker. scala:135 xx..actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55 akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor. scala:465 xx..actors.FileProcessor.aroundReceive(PartialFunction, Object) FileProcessor.scala:27 akka.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516 akka.actor.ActorCell.invoke(Envelope)ActorCell.scala:487 akka.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238 akka.dispatch.Mailbox.run()Mailbox.scala:220 java.lang.Thread.run()Thread.java:744 Now one who does not : Sys-akka.actor.pinned-dispatcher-6 [WAITING] java.lang.Object.wait()Object.java:503 org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.java: 91 org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int)QueryExecutorImpl.java:228 org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query )AbstractJdbc2Connection.java:808 org.postgresql.jdbc2.AbstractJdbc2Connection.rollback() AbstractJdbc2Connection.java:861 com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState() ConnectionProxy.java:192 com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305 java.lang.reflect.Method.invoke(Object, Object[])Method.java:606 xx..util.Cleaning$.using(Object, Function1)Cleaning.scala:14 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1. apply$mcV$sp()FileProcessor.scala:75 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 xx..actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply ()FileProcessor.scala:56 akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply() CircuitBreaker.scala:135
[akka-user] Circuit Breaker and database failures issue
Hi, I'm currently trying to implement some kind of supervision in my app. Some context first : I have a controller actor dispatching files to parse when processor actor is done with his previous file. Of course I need to bootstrap the processing chain when application starts. My file processors are doing JDBC calls to insert the files content through a connection acquired from a datasource holder actor (using HikariCP with initializationFailFast enabled). Note my processors are currently *eight* and they are, as well as the controller, attached to a pinned dispatcher. What I need to do is ensuring that when the DB is down (or anything bad happens while we're at it), all messages are buffered/kept until it's up again to resume processing. I tried to implement something with Circuit breaker and PeekMailbox (Not stashing because I don't see the benefit, but already tried with it and similar errors) but for some reason my system is blocked at the HalfOpen state. This is probably because of my logic but I can't figure out why. Here are some simplified snippets : Controller actor case Processed(pfile) = logger.debug(sController received good processing of file [${pfile.fileName}]) fileHandler ! Processed(pfile) processFiles(1) case Unprocessed = processFiles(1) //sends a file to processors through a router File processor actor(s) val breaker = CircuitBreaker(context.system.scheduler, maxFailures = 1, callTimeout = 30 seconds, resetTimeout = 10 seconds) breaker.onOpen { logger.info(Switching to Open state) }.onHalfOpen { logger.info(Switching to HalfOpen state) }.onClose { logger.info(Switching to Closed state) } case Forwarded(file) = logger.debug(sFileParser${hashCode()} detected file [${file.getName}]) try{ breaker.withSyncCircuitBreaker { val parsedFile = doParsingAndValidation() parsedFile match { case Success(parsedFile) = using((connectionFactory ? GetConnection).mapTo[Connection ].waitForResult) { conn = conn.setAutoCommit(false) doJdbcStuff(conn) controller ! Processed(parsedFile) PeekMailboxExtension.ack() } case Failure(err) = controller ! Malformed(file, err) } } } catch { case ex: Exception = controller ! Unprocessed logger.error(sCouldn't process file [${file.getName}]. Will try again later.) } *Here is a link to the logs http://pastebin.com/dQavVcr3*. You can see It's stuck in Half Open state because for some reason it's not receiving messages anymore. I've also got some weird stuff going on in my logs like : 16:43:06.161 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR fr.catalina.actors.FileProcessor - Couldn't process file [MS7347RP.114]. Will try again later. 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG fr.catalina.actors.FileProcessor - FileParser1834490077 detected file [MS0134RP.114] 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR fr.catalina.actors.FileProcessor - Couldn't process file [MS0134RP.114]. Will try again later. 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG fr.catalina.actors.FileProcessor - FileParser1834490077 detected file [MS0134RP.114] 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR fr.catalina.actors.FileProcessor - Couldn't process file [MS0134RP.114]. Will try again later. Why twice ? Anyway, I'd appreciate any tips or advices pointing me in the right direction. Thanks ! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Circuit breaker and database failures issue
Hi, I'm currently trying to implement some kind of supervision in my app. Some context first : I have a controller actor dispatching files to parse when processor actor is done with his previous file. Of course I need to bootstrap the processing chain when application starts. My file processors are doing JDBC calls to insert the files content through a connection acquired from a datasource holder actor (using HikariCP with initializationFailFast enabled). Note my processors are currently eight and they are, as well as the controller, attached to a pinned dispatcher. What I need to do is ensuring that when the DB is down (or anything bad happens while we're at it), all messages are buffered/kept until it's up again to resume processing. I tried to implement something with Circuit breaker and PeekMailbox (Not stashing because I don't see the benefit, but already tried with it and similar errors) but for some reason my system is blocked at the HalfOpen state. This is probably because of my logic but I can't figure out why. Here are some simplified snippets : Controller actor case Processed(pfile) = logger.debug(sController received good processing of file [${pfile.fileName}]) fileHandler ! Processed(pfile) processFiles(1) case Unprocessed = processFiles(1) //sends a file to processors through a router File processor actor(s) val breaker = CircuitBreaker(context.system.scheduler, maxFailures = 1, callTimeout = 30 seconds, resetTimeout = 10 seconds) breaker.onOpen { logger.info(Switching to Open state) }.onHalfOpen { logger.info(Switching to HalfOpen state) }.onClose { logger.info(Switching to Closed state) } case Forwarded(file) = logger.debug(sFileParser${hashCode()} detected file [${file.getName}]) try{ breaker.withSyncCircuitBreaker { val parsedFile = doParsingAndValidation() parsedFile match { case Success(parsedFile) = using((connectionFactory ? GetConnection).mapTo[Connection ].waitForResult) { conn = conn.setAutoCommit(false) doJdbcStuff(conn) controller ! Processed(parsedFile) PeekMailboxExtension.ack() } case Failure(err) = controller ! Malformed(file, err) } } } catch { case ex: Exception = controller ! Unprocessed logger.error(sCouldn't process file [${file.getName}]. Will try again later.) } Here is a link to the logs http://pastebin.com/6kne5aMp (will die in 1 week). You can see It's stuck in Half Open state because for some reason it's not receiving messages anymore. I've also got some weird stuff going on in my logs like : 16:43:06.161 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR fr.catalina.actors.FileProcessor - Couldn't process file [file1]. Will try again later. 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG fr.catalina.actors.FileProcessor - FileParser1834490077 detected file [file1] 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR fr.catalina.actors.FileProcessor - Couldn't process file [file1]. Will try again later. 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG fr.catalina.actors.FileProcessor - FileParser1834490077 detected file [file1] 16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR fr.catalina.actors.FileProcessor - Couldn't process file [file1]. Will try again later. Why twice ? Anyway, I'd appreciate any tips or advices pointing me in the right direction. Thanks ! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Sending a message after Future(s) completion
Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Sending a message after Future(s) completion
Missed to wrap the Nil in a Future, but it works perfect, thanks ! Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit : WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com javascript: wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Sending a message after Future(s) completion
Oh I see, it looks better indeed. Thanks. Le vendredi 27 juin 2014 12:15:38 UTC+2, √ a écrit : Oh, it's even better, use recover instead of recoverWith. On Fri, Jun 27, 2014 at 12:14 PM, Jasper lme...@excilys.com javascript: wrote: Missed to wrap the Nil in a Future, but it works perfect, thanks ! Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit : WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.