Yes, i understand, but still being safe about actor own state consistency 
is what we need from actors.

On Wednesday, August 6, 2014 2:01:30 PM UTC+7, √ wrote:
>
> Its thread safe but I wouldn't call it safe-it could process other 
> messages in between the different executable ones.
> On Aug 6, 2014 5:42 AM, "Oleg Galako" <ojo...@gmail.com <javascript:>> 
> wrote:
>
>> Here is a code example for what i meant by 
>>
>> "splitting async processing into several message steps (which requires 
>> naming messages, passing context in them, etc)"
>>
>> ------------------------------------
>> // Some external services, actors, something asynchronous
>> object ExternalStuff {
>>   def step1(param: String)(implicit ec: ExecutionContext): Future[String] 
>> = Future { param + "1" }
>>   def step2(param: String)(implicit ec: ExecutionContext): Future[String] 
>> = Future { param + "2" }
>>   def step3(param: String)(implicit ec: ExecutionContext): Future[String] 
>> = Future { param + "3" }
>> }
>>
>> // Actor messages
>> case class MultistepOperation(s: String)
>> case class Step1(ctx: Context)
>> case class Step2(ctx: Context)
>> case class Step3(ctx: Context)
>>
>> // A context of a multi-step operation
>> case class Context(data: String, replyTo: ActorRef)
>>
>> class SampleActor extends Actor {
>>   import scala.concurrent.ExecutionContext.Implicits.global
>>   import ExternalStuff._
>>
>>   // lets say consistent state of the actor means that these 2 variables 
>> are equal
>>   var s1: String = ""
>>   var s2: String = ""
>>
>>   // which means that this method must not be called concurrently
>>   def updateState(s: String): Unit = {
>>     s1 = s
>>     s2 = s
>>   }
>>
>>   override def receive = {
>>     case MultistepOperation(s) =>
>>       val contextData = s + "a"
>>       val sdr = sender()
>>       step1(contextData).map(x => Step1(Context(x, sdr))) pipeTo self
>>
>>     case Step1(ctx) =>
>>       val contextData = ctx.data + "b"
>>       step2(contextData).map(x => Step2(Context(x, ctx.replyTo))) pipeTo 
>> self
>>
>>     case Step2(ctx) =>
>>       val contextData = ctx.data + "c"
>>       step3(contextData).map(x => Step3(Context(x, ctx.replyTo))) pipeTo 
>> self
>>
>>     case Step3(ctx) =>
>>       updateState(ctx.data)
>>       val endResult = s"${ctx.data}, $s1 $s2"
>>       ctx.replyTo ! endResult
>>   }
>> }
>> ------------------------------------
>>
>>
>> Here is what i meant by 
>>
>> "Looks like Futures inside actors are better than splitting async 
>> processing into several message steps"
>>
>> but the following code is not threadsafe
>>
>> "because the callback will be scheduled concurrently to the enclosing 
>> actor"
>>
>> ------------------------------------
>> class SampleActor extends Actor {
>>   import scala.concurrent.ExecutionContext.Implicits.global
>>   import ExternalStuff._
>>
>>   // lets say consistent state of the actor means that these 2 variables 
>> are equal
>>   var s1: String = ""
>>   var s2: String = ""
>>
>>   // which means that this method must not be called concurrently
>>   def updateState(s: String): Unit = {
>>     s1 = s
>>     s2 = s
>>   }
>>
>>   override def receive = {
>>     case MultistepOperation(s) =>
>>       val sdr = sender()
>>       val contextData = s + "a"
>>       for (step1Result <- step1(contextData);
>>            contextData2 = step1Result + "b";
>>            step2Result <- step2(contextData2);
>>            contextData3 = step2Result + "c";
>>            step3Result <- step3(contextData3))
>>       {
>>         updateState(step3Result)
>>         val endResult = s"$step3Result, $s1, $s2"
>>         sdr ! endResult
>>       }
>>   }
>> }
>> ------------------------------------
>>
>>
>> Here is the same as above but with custom ExecutionContext and thus 
>> threadsafe again like the first version:
>>
>> ------------------------------------
>> class SampleActor extends Actor {
>>   import ExternalStuff._
>>
>>   // lets say consistent state of the actor means that these 2 variables 
>> are equal
>>   var s1: String = ""
>>   var s2: String = ""
>>
>>   // which means that this method must not be called concurrently
>>   def updateState(s: String): Unit = {
>>     s1 = s
>>     s2 = s
>>   }
>>
>>   def withCustomEC(f: ExecutionContext => Receive): Receive = {
>>     def handleExecuteRunnable: Receive = {
>>       case ExecuteRunnable(r) => println("Handling ExecuteRunnable"); 
>> r.run()
>>     }
>>
>>     handleExecuteRunnable orElse  f(new ExecutionContext {
>>       override def execute(runnable: Runnable) = {
>>         println("Sending ExecuteRunnable")
>>         self forward ExecuteRunnable(runnable)
>>       }
>>       override def reportFailure(t: Throwable) = ???
>>     })
>>   }
>>
>>   override def receive = withCustomEC(implicit ec => {
>>     case MultistepOperation(s) => /* same as previous */
>>       val sdr = sender()
>>       val contextData = s + "a"
>>       for (step1Result <- step1(contextData);
>>            contextData2 = step1Result + "b";
>>            step2Result <- step2(contextData2);
>>            contextData3 = step2Result + "c";
>>            step3Result <- step3(contextData3))
>>       {
>>         updateState(step3Result)
>>         val endResult = s"$step3Result, $s1, $s2"
>>         sdr ! endResult
>>       }
>>   })
>> }
>> ------------------------------------
>>
>>
>> On Wednesday, August 6, 2014 12:46:10 AM UTC+7, Konrad Malawski wrote:
>>>
>>> If you want to send the result of a Future to an actor (can be self), 
>>> simply use the pipe pattern:
>>> http://doc.akka.io/docs/akka/snapshot/scala/futures.html#use-with-actors
>>>
>>> import akka.pattern.pipe
>>> Future { 42 } pipeTo self
>>>
>>> // short email as my laptop is dying right :-)
>>>
>>>
>>> On Tue, Aug 5, 2014 at 7:40 PM, Oleg Galako <ojo...@gmail.com> wrote:
>>>
>>>> Hi Konrad,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> Here's a quote from Akka docs on Actors:
>>>>
>>>> -------------
>>>> Warning
>>>> When using future callbacks, such as onComplete, onSuccess, and 
>>>> onFailure, inside actors you need to carefully avoid closing over the 
>>>> containing actor’s reference, i.e. do not call methods or access mutable 
>>>> state on the enclosing actor from within the callback. This would break 
>>>> the 
>>>> actor encapsulation and may introduce synchronization bugs and race 
>>>> conditions because the callback will be scheduled concurrently to the 
>>>> enclosing actor. Unfortunately there is not yet a way to detect these 
>>>> illegal accesses at compile time. See also: Actors and shared mutable state
>>>> -------------
>>>>
>>>> I want to fix this: 
>>>>
>>>> "because the callback will be scheduled concurrently to the enclosing 
>>>> actor"
>>>>
>>>> by executing callbacks via actor messages - so that code inside Future 
>>>> callbacks could be thread safe in the same way as usual actor message 
>>>> handling code without Futures.
>>>>
>>>>
>>>> On Tuesday, August 5, 2014 11:32:45 PM UTC+7, Konrad Malawski wrote:
>>>>
>>>>> Hi Oleg,
>>>>> This is a bit confusing – why would you need this? 
>>>>> What is this supposed to be helping with?
>>>>>
>>>>> The simplest way to get an execution context for a Future to run on is 
>>>>> `import context.dispatcher` which makes it run on the same dispatcher as 
>>>>> the Actor.
>>>>> Of course, that's not always what you want, so in Akka you can defined 
>>>>> dispatchers in configuration and then look them up by name, then use them 
>>>>> for your Futures.
>>>>> Please refer to the Dispatchers 
>>>>> <http://doc.akka.io/docs/akka/2.3.4/scala/dispatchers.html> section 
>>>>> of the docs.
>>>>>
>>>>> Hope this helps, and if not, please explain in more detail what you 
>>>>> are trying to achieve :-)
>>>>>
>>>>>
>>>>> On Tue, Aug 5, 2014 at 3:35 PM, Oleg Galako <ojo...@gmail.com> wrote:
>>>>>
>>>>>> Greetings!
>>>>>>
>>>>>> Looks like Futures inside actors are better than splitting async 
>>>>>> processing into several message steps (which requires naming messages, 
>>>>>> passing context in them, etc).
>>>>>>
>>>>>> Is there something wrong with this code or maybe something like that 
>>>>>> already implemented in a better way?
>>>>>>
>>>>>> case class ExecuteRunnable(r: Runnable)
>>>>>> class AnotherActor extends Actor {
>>>>>>     def withCustomEC(f: ExecutionContext => Receive): Receive = {
>>>>>>
>>>>>>       def handleExecuteRunnable: Receive = {
>>>>>>         case ExecuteRunnable(r) => println("Handling 
>>>>>> ExecuteRunnable"); r.run()
>>>>>>       }
>>>>>>
>>>>>>       handleExecuteRunnable orElse  f(new ExecutionContext {
>>>>>>         override def execute(runnable: Runnable) = {
>>>>>>           println("Sending ExecuteRunnable")
>>>>>>           self forward ExecuteRunnable(runnable)
>>>>>>         }
>>>>>>         override def reportFailure(t: Throwable) = ???
>>>>>>       })
>>>>>>     }
>>>>>>
>>>>>>     override def receive = withCustomEC(implicit ec => {
>>>>>>         case _ =>
>>>>>>           println("got msg")
>>>>>>           Future { 42 }.foreach(_ => println("Future completed"))
>>>>>>     })
>>>>>> }
>>>>>>
>>>>>>
>>>>>>  -- 
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>>> urrent/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>>> p/akka-user
>>>>>> --- 
>>>>>> You received this message because you are subscribed to the Google 
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>>
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -- 
>>>>> Cheers,
>>>>> Konrad 'ktoso' Malawski
>>>>> hAkker @ Typesafe
>>>>>
>>>>> <http://typesafe.com>
>>>>>  
>>>>  -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> Konrad 'ktoso' Malawski
>>> hAkker @ Typesafe
>>>
>>> <http://typesafe.com>
>>>  
>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to