Allen,

Here is one definition of back pressure
http://www.reactivemanifesto.org/glossary#Back-Pressure
For example, in your initial user question about memory errors the back
pressure mechanism  of akka-streams allows processing your data even with a
limited memory budget.

I have personally found this presentation (
https://www.youtube.com/watch?v=khmVMvlP_QA
) by Roland Kuhn very helpful in understanding the motivations and core
concepts behind akka-streams which is an implementation of reactive streams
( http://www.reactive-streams.org/).


-Soumya


On Sat, Jan 10, 2015 at 7:42 PM, Allen Nie <aiming...@gmail.com> wrote:

> Hi Endre,
>
>     That's a very valid suggestion. I'm quite new to Akka (finished about
> 35% of its docs). I'm still trying to understand how to properly
> parallelize tasks. You and Viktor mentioned back-pressure. Can you go a bit
> deeper in that. For example, what is back-pressure and how to build it into
> my actor solutions ? (Info links would be all I need). I asked a similar
> question like this on StackOverflow but no one could point me to the right
> direction.
>
>     Thank you for linking Akka-stream's docs.
>
> Allen
>
>
> On Saturday, January 10, 2015 at 5:38:42 AM UTC-5, drewhk wrote:
>>
>> Hi,
>>
>> On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie <aimi...@gmail.com> wrote:
>>
>>> Hey Viktor,
>>>
>>>     I'm trying to use Akka to parallelize this process. There shouldn't
>>> be any bottleneck, and I don't understand why I got memory overflow with my
>>> first version (actor version). The main task is to read in a line, break it
>>> up, and turn each segments (strings) into an integer, then prints it out to
>>> a CSV file (vectorization process).
>>>
>>>    def processLine(line: String): Unit = {
>>>
>>>   val vector: ListBuffer[String] = ListBuffer()
>>>   val segs = line.split(",")
>>>
>>>   println(segs(0))
>>>
>>>   (1 to segs.length - 1).map {i =>
>>>     val factorArray = dictionaries(i-1)
>>>     vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
>>> level of string
>>>   }
>>>
>>>   timer ! OneDone
>>>
>>>   printer ! Print(vector.toList)}
>>>
>>>
>>>     When I'm doing this in pure Akka (with actors), since I created 40
>>> million objects: Row(line: String), I get memory overflow issue.
>>>
>>
>> No surprise there, you just slurp up all rows faster than the actors can
>> keep up processing them, so most of them are in a mailbox. In fact if your
>> actors do something trivially simple, the whole overhead of asynchronously
>> passing elements to the actors might be larger than what you gain. In these
>> cases it is recommended to pass batches of Rows instead of one-by-one.
>> Remember, parallelisation only gains when the overhead of it is smaller
>> than the task it parallelizes.
>>
>>
>>
>>> If I use Akka-stream, there is no memory overflow issue, but the
>>> performance is too similar to the non-parallelized version (even slower).
>>>
>>
>> No surprise there either, you did nothing to parallelize or pipeline any
>> computation in the stream, so you get the overhead of asynchronous
>> processing and none of the benefits of it (but at least you get
>> backpressure).
>>
>> You have a few approaches to get the benefints of multi-core processing
>> with streams:
>>  - if you have multiple processing steps for a row you can pipeline them,
>> see the intro part of this doc page: http://doc.akka.io/docs/
>> akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
>>  - you can use mapAsync to have similar effects but with one computation
>> step, see here: http://doc.akka.io/docs/akka-stream-and-http-
>> experimental/1.0-M2/scala/stream-integrations.html#
>> Illustrating_ordering_and_parallelism
>>  - you can explicitly add fan-out elements to parallelise among multiple
>> explicit workers, see here: http://doc.akka.io/docs/akka-stream-and-http-
>> experimental/1.0-M2/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_
>> pool_of_workers
>>
>> Overall, for this kind of tasks I recommend using Streams, but you need
>> to read the documentation first to understand how it works.
>>
>> -Endre
>>
>>
>>>
>>>     It's my first time using Akka-stream. So I'm unfamiliar with the
>>> optimization you were talking about.
>>>
>>> Sincerely,
>>> Allen
>>>
>>> On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:
>>>>
>>>> Hi Allen,
>>>>
>>>> What's the bottleneck?
>>>> Have you tried enabling the experimental optimizations?
>>>>
>>>> On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie <aimi...@gmail.com> wrote:
>>>>
>>>>> Thank you Soumya,
>>>>>
>>>>>        I think Akka-streams is the way to go. However, I would also
>>>>> appreciate some performance boost as well - still have 40 million lines to
>>>>> go through! But thanks anyway!
>>>>>
>>>>>
>>>>> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:
>>>>>>
>>>>>> I would recommend using the Akka-streams API for this.
>>>>>> Here is sample. I was able to process a 1G file with around 1.5
>>>>>> million records in *20MB* of memory. The file read and the writing
>>>>>> on the console rates are different but the streams API handles that.  
>>>>>> This
>>>>>> is not the fastest but you at least won't run out of memory.
>>>>>>
>>>>>>
>>>>>>
>>>>>> <https://lh6.googleusercontent.com/-zdX0n1pvueE/VLATDja3K4I/AAAAAAAAv18/BH7V1RAuxT8/s1600/1gb_file_processing.png>
>>>>>>
>>>>>> import java.io.FileInputStream
>>>>>> import java.util.Scanner
>>>>>>
>>>>>> import akka.actor.ActorSystem
>>>>>> import akka.stream.{FlowMaterializer, MaterializerSettings}
>>>>>> import akka.stream.scaladsl.Source
>>>>>>
>>>>>> import scala.util.Try
>>>>>>
>>>>>>
>>>>>> object StreamingFileReader extends App {
>>>>>>
>>>>>>
>>>>>>   val inputStream = new FileInputStream("/path/to/file")
>>>>>>   val sc = new Scanner(inputStream, "UTF-8")
>>>>>>
>>>>>>   implicit val system = ActorSystem("Sys")
>>>>>>   val settings = MaterializerSettings(system)
>>>>>>   implicit val materializer = 
>>>>>> FlowMaterializer(settings.copy(maxInputBufferSize
>>>>>> = 256, initialInputBufferSize = 256))
>>>>>>
>>>>>>   val fileSource = Source(() => Iterator.continually(sc.nextLine()))
>>>>>>
>>>>>>   import system.dispatcher
>>>>>>
>>>>>>   fileSource.map { line =>
>>>>>>     line //do nothing
>>>>>>   //in the for each print the line.
>>>>>>   }.foreach(println).onComplete { _ =>
>>>>>>     Try {
>>>>>>       sc.close()
>>>>>>       inputStream.close()
>>>>>>     }
>>>>>>     system.shutdown()
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>     I am trying to process a csv file with 40 million lines of data
>>>>>>> in there. It's a 5GB size file. I'm trying to use Akka to parallelize 
>>>>>>> the
>>>>>>> task. However, it seems like I can't stop the quick memory growth. It
>>>>>>> expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. 
>>>>>>> This is
>>>>>>> the code in my main() method:
>>>>>>>
>>>>>>> val inputStream = new 
>>>>>>> FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")val sc = new 
>>>>>>> Scanner(inputStream, "UTF-8")
>>>>>>> var counter = 0
>>>>>>> while (sc.hasNextLine) {
>>>>>>>
>>>>>>>   rowActors(counter % 20) ! Row(sc.nextLine())
>>>>>>>
>>>>>>>   counter += 1}
>>>>>>>
>>>>>>> sc.close()
>>>>>>> inputStream.close()
>>>>>>>
>>>>>>>     Someone pointed out that I was essentially creating 40 million
>>>>>>> Row objects, which naturally will take up a lot of space. My row actor 
>>>>>>> is
>>>>>>> not doing much. Just simply transforming each line into an array of
>>>>>>> integers (if you are familiar with the concept of vectorizing, that's 
>>>>>>> what
>>>>>>> I'm doing). Then the transformed array gets printed out. Done. I 
>>>>>>> originally
>>>>>>> thought there was a memory leak but maybe I'm not managing memory right.
>>>>>>> Can I get any wise suggestions from the Akka experts here??
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> <http://i.stack.imgur.com/yQ4xx.png>
>>>>>>>
>>>>>>>  --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>> urrent/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>> p/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> √
>>>>
>>>  --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>> current/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>> group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/LvCY31-ILBA/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to