Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-16 Thread Akka Team
Hello there,

As you can see, one of the heavier duty actually happens within the
> element of the input array. Is there a way for me to parallelize that
> process as well since it happened inside a map?

Yes, you can use `mapAsync` and if you do not care about ordering you can
use `mapAsyncUnordered` to parallelize the computation a bit more.

Docs here:
http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/index.html#akka.stream.javadsl.Flow@mapAsyncUnordered[T](f:akka.stream.javadsl.japi.Function[Out,scala.concurrent.Future[T]]):akka.stream.javadsl.Flow[In,T]
Docs here:
http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/index.html#akka.stream.javadsl.Flow@mapAsync[T](f:akka.stream.javadsl.japi.Function[Out,scala.concurrent.Future[T]]):akka.stream.javadsl.Flow[In,T]



-- 
Konrad
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-10 Thread Allen Nie
Thanks Soumya. That's very helpful!

Drewhk, 

Sorry to bother you one more time. After reading the documents, I 
changed my code to this in order to fully utilize parallelism in Akka 
Stream:

fileSource.map {line =>

  line.split(",")}
  .map { segs =>
  segs(0) +: (1 to segs.length -1).map { i =>
stableDictionaries(i-1).indexOf(segs(i)).toString
  }}
  .foreach{(segs) =>
  println(segs(0))
  timer ! OneDone //not sure if this works
  printer ! Print(segs)}.onComplete { _ =>
  Try {
sc.close()
inputStream.close()
  }
  system.shutdown()}


As you can see, one of the heavier duty actually happens within the 
element of the input array. Is there a way for me to parallelize that 
process as well since it happened inside a map?

Allen

On Saturday, January 10, 2015 at 8:03:18 PM UTC-5, Soumya Simanta wrote:
>
> Allen, 
>
> Here is one definition of back pressure 
> http://www.reactivemanifesto.org/glossary#Back-Pressure
> For example, in your initial user question about memory errors the back 
> pressure mechanism  of akka-streams allows processing your data even with a 
> limited memory budget. 
>
> I have personally found this presentation (
> https://www.youtube.com/watch?v=khmVMvlP_QA
> ) by Roland Kuhn very helpful in understanding the motivations and core 
> concepts behind akka-streams which is an implementation of reactive streams 
> ( http://www.reactive-streams.org/). 
>
>
> -Soumya
>
>
> On Sat, Jan 10, 2015 at 7:42 PM, Allen Nie  > wrote:
>
>> Hi Endre,
>>
>> That's a very valid suggestion. I'm quite new to Akka (finished about 
>> 35% of its docs). I'm still trying to understand how to properly 
>> parallelize tasks. You and Viktor mentioned back-pressure. Can you go a bit 
>> deeper in that. For example, what is back-pressure and how to build it into 
>> my actor solutions ? (Info links would be all I need). I asked a similar 
>> question like this on StackOverflow but no one could point me to the right 
>> direction.
>>
>> Thank you for linking Akka-stream's docs.
>>
>> Allen
>>
>>
>> On Saturday, January 10, 2015 at 5:38:42 AM UTC-5, drewhk wrote:
>>>
>>> Hi,
>>>
>>> On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie  wrote:
>>>
 Hey Viktor,

 I'm trying to use Akka to parallelize this process. There shouldn't 
 be any bottleneck, and I don't understand why I got memory overflow with 
 my 
 first version (actor version). The main task is to read in a line, break 
 it 
 up, and turn each segments (strings) into an integer, then prints it out 
 to 
 a CSV file (vectorization process).

def processLine(line: String): Unit = {

   val vector: ListBuffer[String] = ListBuffer()
   val segs = line.split(",")

   println(segs(0))

   (1 to segs.length - 1).map {i =>
 val factorArray = dictionaries(i-1)
 vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
 level of string
   }

   timer ! OneDone

   printer ! Print(vector.toList)}


 When I'm doing this in pure Akka (with actors), since I created 40 
 million objects: Row(line: String), I get memory overflow issue. 

>>>
>>> No surprise there, you just slurp up all rows faster than the actors can 
>>> keep up processing them, so most of them are in a mailbox. In fact if your 
>>> actors do something trivially simple, the whole overhead of asynchronously 
>>> passing elements to the actors might be larger than what you gain. In these 
>>> cases it is recommended to pass batches of Rows instead of one-by-one. 
>>> Remember, parallelisation only gains when the overhead of it is smaller 
>>> than the task it parallelizes. 
>>>
>>>  
>>>
 If I use Akka-stream, there is no memory overflow issue, but the 
 performance is too similar to the non-parallelized version (even slower).

>>>
>>> No surprise there either, you did nothing to parallelize or pipeline any 
>>> computation in the stream, so you get the overhead of asynchronous 
>>> processing and none of the benefits of it (but at least you get 
>>> backpressure).
>>>
>>> You have a few approaches to get the benefints of multi-core processing 
>>> with streams:
>>>  - if you have multiple processing steps for a row you can pipeline 
>>> them, see the intro part of this doc page: http://doc.akka.io/docs/
>>> akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
>>>  - you can use mapAsync to have similar effects but with one computation 
>>> step, see here: http://doc.akka.io/docs/akka-stream-and-http-
>>> experimental/1.0-M2/scala/stream-integrations.html#
>>> Illustrating_ordering_and_parallelism
>>>  - you can explicitly add fan-out elements to parallelise among multiple 
>>> explicit workers, see here: http://doc.akka.io/docs/
>>> akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#
>>> Balancing_jobs_to_a_fixed_pool_of_workers
>>>
>>> Overall, for this kind of tasks I recomme

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-10 Thread Soumya Simanta
Allen,

Here is one definition of back pressure
http://www.reactivemanifesto.org/glossary#Back-Pressure
For example, in your initial user question about memory errors the back
pressure mechanism  of akka-streams allows processing your data even with a
limited memory budget.

I have personally found this presentation (
https://www.youtube.com/watch?v=khmVMvlP_QA
) by Roland Kuhn very helpful in understanding the motivations and core
concepts behind akka-streams which is an implementation of reactive streams
( http://www.reactive-streams.org/).


-Soumya


On Sat, Jan 10, 2015 at 7:42 PM, Allen Nie  wrote:

> Hi Endre,
>
> That's a very valid suggestion. I'm quite new to Akka (finished about
> 35% of its docs). I'm still trying to understand how to properly
> parallelize tasks. You and Viktor mentioned back-pressure. Can you go a bit
> deeper in that. For example, what is back-pressure and how to build it into
> my actor solutions ? (Info links would be all I need). I asked a similar
> question like this on StackOverflow but no one could point me to the right
> direction.
>
> Thank you for linking Akka-stream's docs.
>
> Allen
>
>
> On Saturday, January 10, 2015 at 5:38:42 AM UTC-5, drewhk wrote:
>>
>> Hi,
>>
>> On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie  wrote:
>>
>>> Hey Viktor,
>>>
>>> I'm trying to use Akka to parallelize this process. There shouldn't
>>> be any bottleneck, and I don't understand why I got memory overflow with my
>>> first version (actor version). The main task is to read in a line, break it
>>> up, and turn each segments (strings) into an integer, then prints it out to
>>> a CSV file (vectorization process).
>>>
>>>def processLine(line: String): Unit = {
>>>
>>>   val vector: ListBuffer[String] = ListBuffer()
>>>   val segs = line.split(",")
>>>
>>>   println(segs(0))
>>>
>>>   (1 to segs.length - 1).map {i =>
>>> val factorArray = dictionaries(i-1)
>>> vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
>>> level of string
>>>   }
>>>
>>>   timer ! OneDone
>>>
>>>   printer ! Print(vector.toList)}
>>>
>>>
>>> When I'm doing this in pure Akka (with actors), since I created 40
>>> million objects: Row(line: String), I get memory overflow issue.
>>>
>>
>> No surprise there, you just slurp up all rows faster than the actors can
>> keep up processing them, so most of them are in a mailbox. In fact if your
>> actors do something trivially simple, the whole overhead of asynchronously
>> passing elements to the actors might be larger than what you gain. In these
>> cases it is recommended to pass batches of Rows instead of one-by-one.
>> Remember, parallelisation only gains when the overhead of it is smaller
>> than the task it parallelizes.
>>
>>
>>
>>> If I use Akka-stream, there is no memory overflow issue, but the
>>> performance is too similar to the non-parallelized version (even slower).
>>>
>>
>> No surprise there either, you did nothing to parallelize or pipeline any
>> computation in the stream, so you get the overhead of asynchronous
>> processing and none of the benefits of it (but at least you get
>> backpressure).
>>
>> You have a few approaches to get the benefints of multi-core processing
>> with streams:
>>  - if you have multiple processing steps for a row you can pipeline them,
>> see the intro part of this doc page: http://doc.akka.io/docs/
>> akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
>>  - you can use mapAsync to have similar effects but with one computation
>> step, see here: http://doc.akka.io/docs/akka-stream-and-http-
>> experimental/1.0-M2/scala/stream-integrations.html#
>> Illustrating_ordering_and_parallelism
>>  - you can explicitly add fan-out elements to parallelise among multiple
>> explicit workers, see here: http://doc.akka.io/docs/akka-stream-and-http-
>> experimental/1.0-M2/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_
>> pool_of_workers
>>
>> Overall, for this kind of tasks I recommend using Streams, but you need
>> to read the documentation first to understand how it works.
>>
>> -Endre
>>
>>
>>>
>>> It's my first time using Akka-stream. So I'm unfamiliar with the
>>> optimization you were talking about.
>>>
>>> Sincerely,
>>> Allen
>>>
>>> On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:

 Hi Allen,

 What's the bottleneck?
 Have you tried enabling the experimental optimizations?

 On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie  wrote:

> Thank you Soumya,
>
>I think Akka-streams is the way to go. However, I would also
> appreciate some performance boost as well - still have 40 million lines to
> go through! But thanks anyway!
>
>
> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:
>>
>> I would recommend using the Akka-streams API for this.
>> Here is sample. I was able to process a 1G file with around 1.5
>> million records in *20MB* of memory. The file read and t

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-10 Thread Allen Nie
Hi Endre,

That's a very valid suggestion. I'm quite new to Akka (finished about 
35% of its docs). I'm still trying to understand how to properly 
parallelize tasks. You and Viktor mentioned back-pressure. Can you go a bit 
deeper in that. For example, what is back-pressure and how to build it into 
my actor solutions ? (Info links would be all I need). I asked a similar 
question like this on StackOverflow but no one could point me to the right 
direction.

Thank you for linking Akka-stream's docs.

Allen


On Saturday, January 10, 2015 at 5:38:42 AM UTC-5, drewhk wrote:
>
> Hi,
>
> On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie  > wrote:
>
>> Hey Viktor,
>>
>> I'm trying to use Akka to parallelize this process. There shouldn't 
>> be any bottleneck, and I don't understand why I got memory overflow with my 
>> first version (actor version). The main task is to read in a line, break it 
>> up, and turn each segments (strings) into an integer, then prints it out to 
>> a CSV file (vectorization process).
>>
>>def processLine(line: String): Unit = {
>>
>>   val vector: ListBuffer[String] = ListBuffer()
>>   val segs = line.split(",")
>>
>>   println(segs(0))
>>
>>   (1 to segs.length - 1).map {i =>
>> val factorArray = dictionaries(i-1)
>> vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
>> level of string
>>   }
>>
>>   timer ! OneDone
>>
>>   printer ! Print(vector.toList)}
>>
>>
>> When I'm doing this in pure Akka (with actors), since I created 40 
>> million objects: Row(line: String), I get memory overflow issue. 
>>
>
> No surprise there, you just slurp up all rows faster than the actors can 
> keep up processing them, so most of them are in a mailbox. In fact if your 
> actors do something trivially simple, the whole overhead of asynchronously 
> passing elements to the actors might be larger than what you gain. In these 
> cases it is recommended to pass batches of Rows instead of one-by-one. 
> Remember, parallelisation only gains when the overhead of it is smaller 
> than the task it parallelizes. 
>
>  
>
>> If I use Akka-stream, there is no memory overflow issue, but the 
>> performance is too similar to the non-parallelized version (even slower).
>>
>
> No surprise there either, you did nothing to parallelize or pipeline any 
> computation in the stream, so you get the overhead of asynchronous 
> processing and none of the benefits of it (but at least you get 
> backpressure).
>
> You have a few approaches to get the benefints of multi-core processing 
> with streams:
>  - if you have multiple processing steps for a row you can pipeline them, 
> see the intro part of this doc page: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
>  - you can use mapAsync to have similar effects but with one computation 
> step, see here: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-integrations.html#Illustrating_ordering_and_parallelism
>  - you can explicitly add fan-out elements to parallelise among multiple 
> explicit workers, see here: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
>
> Overall, for this kind of tasks I recommend using Streams, but you need to 
> read the documentation first to understand how it works. 
>
> -Endre
>  
>
>>
>> It's my first time using Akka-stream. So I'm unfamiliar with the 
>> optimization you were talking about.
>>
>> Sincerely,
>> Allen
>>
>> On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:
>>>
>>> Hi Allen,
>>>
>>> What's the bottleneck?
>>> Have you tried enabling the experimental optimizations?
>>>
>>> On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie  wrote:
>>>
 Thank you Soumya, 

I think Akka-streams is the way to go. However, I would also 
 appreciate some performance boost as well - still have 40 million lines to 
 go through! But thanks anyway!


 On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:
>
> I would recommend using the Akka-streams API for this. 
> Here is sample. I was able to process a 1G file with around 1.5 
> million records in *20MB* of memory. The file read and the writing on 
> the console rates are different but the streams API handles that.  This 
> is 
> not the fastest but you at least won't run out of memory. 
>
>
>
> 
>
> import java.io.FileInputStream
> import java.util.Scanner
>
> import akka.actor.ActorSystem
> import akka.stream.{FlowMaterializer, MaterializerSettings}
> import akka.stream.scaladsl.Source
>
> import scala.util.Try
>
>
> object StreamingFileReader extends App {
>
>
>   val inputStream = new FileInputStream("/path/to/file")
>   

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-10 Thread Endre Varga
Hi,

On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie  wrote:

> Hey Viktor,
>
> I'm trying to use Akka to parallelize this process. There shouldn't be
> any bottleneck, and I don't understand why I got memory overflow with my
> first version (actor version). The main task is to read in a line, break it
> up, and turn each segments (strings) into an integer, then prints it out to
> a CSV file (vectorization process).
>
>def processLine(line: String): Unit = {
>
>   val vector: ListBuffer[String] = ListBuffer()
>   val segs = line.split(",")
>
>   println(segs(0))
>
>   (1 to segs.length - 1).map {i =>
> val factorArray = dictionaries(i-1)
> vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
> level of string
>   }
>
>   timer ! OneDone
>
>   printer ! Print(vector.toList)}
>
>
> When I'm doing this in pure Akka (with actors), since I created 40
> million objects: Row(line: String), I get memory overflow issue.
>

No surprise there, you just slurp up all rows faster than the actors can
keep up processing them, so most of them are in a mailbox. In fact if your
actors do something trivially simple, the whole overhead of asynchronously
passing elements to the actors might be larger than what you gain. In these
cases it is recommended to pass batches of Rows instead of one-by-one.
Remember, parallelisation only gains when the overhead of it is smaller
than the task it parallelizes.



> If I use Akka-stream, there is no memory overflow issue, but the
> performance is too similar to the non-parallelized version (even slower).
>

No surprise there either, you did nothing to parallelize or pipeline any
computation in the stream, so you get the overhead of asynchronous
processing and none of the benefits of it (but at least you get
backpressure).

You have a few approaches to get the benefints of multi-core processing
with streams:
 - if you have multiple processing steps for a row you can pipeline them,
see the intro part of this doc page:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
 - you can use mapAsync to have similar effects but with one computation
step, see here:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-integrations.html#Illustrating_ordering_and_parallelism
 - you can explicitly add fan-out elements to parallelise among multiple
explicit workers, see here:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

Overall, for this kind of tasks I recommend using Streams, but you need to
read the documentation first to understand how it works.

-Endre


>
> It's my first time using Akka-stream. So I'm unfamiliar with the
> optimization you were talking about.
>
> Sincerely,
> Allen
>
> On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:
>>
>> Hi Allen,
>>
>> What's the bottleneck?
>> Have you tried enabling the experimental optimizations?
>>
>> On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie  wrote:
>>
>>> Thank you Soumya,
>>>
>>>I think Akka-streams is the way to go. However, I would also
>>> appreciate some performance boost as well - still have 40 million lines to
>>> go through! But thanks anyway!
>>>
>>>
>>> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:

 I would recommend using the Akka-streams API for this.
 Here is sample. I was able to process a 1G file with around 1.5 million
 records in *20MB* of memory. The file read and the writing on the
 console rates are different but the streams API handles that.  This is not
 the fastest but you at least won't run out of memory.



 

 import java.io.FileInputStream
 import java.util.Scanner

 import akka.actor.ActorSystem
 import akka.stream.{FlowMaterializer, MaterializerSettings}
 import akka.stream.scaladsl.Source

 import scala.util.Try


 object StreamingFileReader extends App {


   val inputStream = new FileInputStream("/path/to/file")
   val sc = new Scanner(inputStream, "UTF-8")

   implicit val system = ActorSystem("Sys")
   val settings = MaterializerSettings(system)
   implicit val materializer = 
 FlowMaterializer(settings.copy(maxInputBufferSize
 = 256, initialInputBufferSize = 256))

   val fileSource = Source(() => Iterator.continually(sc.nextLine()))

   import system.dispatcher

   fileSource.map { line =>
 line //do nothing
   //in the for each print the line.
   }.foreach(println).onComplete { _ =>
 Try {
   sc.close()
   inputStream.close()
 }
 system.shutdown()
   }
 }




 On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote:
>
> Hi,
>
>

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-10 Thread Viktor Klang
Hi Allen,

I'd suspect the reason that it works well with Akka Streams is that they
have back-pressure while your actor solution does not (you'll send 40
million messages as fast as you can, but the actor processing them might
not be able to keep up)

On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie  wrote:

> Hey Viktor,
>
> I'm trying to use Akka to parallelize this process. There shouldn't be
> any bottleneck, and I don't understand why I got memory overflow with my
> first version (actor version). The main task is to read in a line, break it
> up, and turn each segments (strings) into an integer, then prints it out to
> a CSV file (vectorization process).
>
>def processLine(line: String): Unit = {
>
>   val vector: ListBuffer[String] = ListBuffer()
>   val segs = line.split(",")
>
>   println(segs(0))
>
>   (1 to segs.length - 1).map {i =>
> val factorArray = dictionaries(i-1)
> vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
> level of string
>   }
>
>   timer ! OneDone
>
>   printer ! Print(vector.toList)}
>
>
> When I'm doing this in pure Akka (with actors), since I created 40
> million objects: Row(line: String), I get memory overflow issue. If I use
> Akka-stream, there is no memory overflow issue, but the performance is too
> similar to the non-parallelized version (even slower).
>
> It's my first time using Akka-stream. So I'm unfamiliar with the
> optimization you were talking about.
>
> Sincerely,
> Allen
>
> On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:
>>
>> Hi Allen,
>>
>> What's the bottleneck?
>> Have you tried enabling the experimental optimizations?
>>
>> On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie  wrote:
>>
>>> Thank you Soumya,
>>>
>>>I think Akka-streams is the way to go. However, I would also
>>> appreciate some performance boost as well - still have 40 million lines to
>>> go through! But thanks anyway!
>>>
>>>
>>> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:

 I would recommend using the Akka-streams API for this.
 Here is sample. I was able to process a 1G file with around 1.5 million
 records in *20MB* of memory. The file read and the writing on the
 console rates are different but the streams API handles that.  This is not
 the fastest but you at least won't run out of memory.



 

 import java.io.FileInputStream
 import java.util.Scanner

 import akka.actor.ActorSystem
 import akka.stream.{FlowMaterializer, MaterializerSettings}
 import akka.stream.scaladsl.Source

 import scala.util.Try


 object StreamingFileReader extends App {


   val inputStream = new FileInputStream("/path/to/file")
   val sc = new Scanner(inputStream, "UTF-8")

   implicit val system = ActorSystem("Sys")
   val settings = MaterializerSettings(system)
   implicit val materializer = 
 FlowMaterializer(settings.copy(maxInputBufferSize
 = 256, initialInputBufferSize = 256))

   val fileSource = Source(() => Iterator.continually(sc.nextLine()))

   import system.dispatcher

   fileSource.map { line =>
 line //do nothing
   //in the for each print the line.
   }.foreach(println).onComplete { _ =>
 Try {
   sc.close()
   inputStream.close()
 }
 system.shutdown()
   }
 }




 On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote:
>
> Hi,
>
> I am trying to process a csv file with 40 million lines of data in
> there. It's a 5GB size file. I'm trying to use Akka to parallelize the
> task. However, it seems like I can't stop the quick memory growth. It
> expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This 
> is
> the code in my main() method:
>
> val inputStream = new 
> FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")val sc = new 
> Scanner(inputStream, "UTF-8")
> var counter = 0
> while (sc.hasNextLine) {
>
>   rowActors(counter % 20) ! Row(sc.nextLine())
>
>   counter += 1}
>
> sc.close()
> inputStream.close()
>
> Someone pointed out that I was essentially creating 40 million Row
> objects, which naturally will take up a lot of space. My row actor is not
> doing much. Just simply transforming each line into an array of integers
> (if you are familiar with the concept of vectorizing, that's what I'm
> doing). Then the transformed array gets printed out. Done. I originally
> thought there was a memory leak but maybe I'm not managing memory right.
> Can I get any wise suggestions from the Akka experts here??
>
>
>
> 
>
>  --
>>> >> Read the docs: http

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-09 Thread Soumya Simanta
Allen,

What are your constraints ? Does the output CSV have to maintain the order 
of the input file ? Do you have an upper bound ?

I don't think you are CPU bound so you need to look at ways of 
reading/writing faster. Maybe async IO using nio can help. 
You can split the input and process in parallel if you don't mind multiple 
output files.  

I'm not aware of any way of doing file IO faster using Akka. Maybe the Akka 
folks can provide better guidance there. 

BTW how much memory are you giving your akka-streams program? 

-Soumya


On Friday, January 9, 2015 at 4:53:29 PM UTC-5, Allen Nie wrote:
>
> Hey Viktor,
>
> I'm trying to use Akka to parallelize this process. There shouldn't be 
> any bottleneck, and I don't understand why I got memory overflow with my 
> first version (actor version). The main task is to read in a line, break it 
> up, and turn each segments (strings) into an integer, then prints it out to 
> a CSV file (vectorization process).
>
>def processLine(line: String): Unit = {
>
>   val vector: ListBuffer[String] = ListBuffer()
>   val segs = line.split(",")
>
>   println(segs(0))
>
>   (1 to segs.length - 1).map {i =>
> val factorArray = dictionaries(i-1)
> vector += factorArray._2.indexOf(segs(i)).toString   //get the factor 
> level of string
>   }
>
>   timer ! OneDone
>
>   printer ! Print(vector.toList)}
>
>
> When I'm doing this in pure Akka (with actors), since I created 40 
> million objects: Row(line: String), I get memory overflow issue. If I use 
> Akka-stream, there is no memory overflow issue, but the performance is too 
> similar to the non-parallelized version (even slower).
>
> It's my first time using Akka-stream. So I'm unfamiliar with the 
> optimization you were talking about.
>
> Sincerely,
> Allen
>
> On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:
>>
>> Hi Allen,
>>
>> What's the bottleneck?
>> Have you tried enabling the experimental optimizations?
>>
>> On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie  wrote:
>>
>>> Thank you Soumya, 
>>>
>>>I think Akka-streams is the way to go. However, I would also 
>>> appreciate some performance boost as well - still have 40 million lines to 
>>> go through! But thanks anyway!
>>>
>>>
>>> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:

 I would recommend using the Akka-streams API for this. 
 Here is sample. I was able to process a 1G file with around 1.5 million 
 records in *20MB* of memory. The file read and the writing on the 
 console rates are different but the streams API handles that.  This is not 
 the fastest but you at least won't run out of memory. 



 

 import java.io.FileInputStream
 import java.util.Scanner

 import akka.actor.ActorSystem
 import akka.stream.{FlowMaterializer, MaterializerSettings}
 import akka.stream.scaladsl.Source

 import scala.util.Try


 object StreamingFileReader extends App {


   val inputStream = new FileInputStream("/path/to/file")
   val sc = new Scanner(inputStream, "UTF-8")

   implicit val system = ActorSystem("Sys")
   val settings = MaterializerSettings(system)
   implicit val materializer = 
 FlowMaterializer(settings.copy(maxInputBufferSize 
 = 256, initialInputBufferSize = 256))

   val fileSource = Source(() => Iterator.continually(sc.nextLine()))

   import system.dispatcher

   fileSource.map { line =>
 line //do nothing
   //in the for each print the line. 
   }.foreach(println).onComplete { _ =>
 Try {
   sc.close()
   inputStream.close()
 }
 system.shutdown()
   }
 }




 On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote:
>
> Hi,
>
> I am trying to process a csv file with 40 million lines of data in 
> there. It's a 5GB size file. I'm trying to use Akka to parallelize the 
> task. However, it seems like I can't stop the quick memory growth. It 
> expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This 
> is 
> the code in my main() method:
>
> val inputStream = new 
> FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")val sc = new 
> Scanner(inputStream, "UTF-8")
> var counter = 0
> while (sc.hasNextLine) {
>
>   rowActors(counter % 20) ! Row(sc.nextLine())
>
>   counter += 1}
>
> sc.close()
> inputStream.close()
>
> Someone pointed out that I was essentially creating 40 million Row 
> objects, which naturally will take up a lot of space. My row actor is not 
> doing much. Just simply transforming each line into an array of integers 
> (if you are familiar with the concept of vectorizing, that's what I

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-09 Thread Allen Nie
Hey Viktor,

I'm trying to use Akka to parallelize this process. There shouldn't be 
any bottleneck, and I don't understand why I got memory overflow with my 
first version (actor version). The main task is to read in a line, break it 
up, and turn each segments (strings) into an integer, then prints it out to 
a CSV file (vectorization process).

   def processLine(line: String): Unit = {

  val vector: ListBuffer[String] = ListBuffer()
  val segs = line.split(",")

  println(segs(0))

  (1 to segs.length - 1).map {i =>
val factorArray = dictionaries(i-1)
vector += factorArray._2.indexOf(segs(i)).toString   //get the factor level 
of string
  }

  timer ! OneDone

  printer ! Print(vector.toList)}


When I'm doing this in pure Akka (with actors), since I created 40 
million objects: Row(line: String), I get memory overflow issue. If I use 
Akka-stream, there is no memory overflow issue, but the performance is too 
similar to the non-parallelized version (even slower).

It's my first time using Akka-stream. So I'm unfamiliar with the 
optimization you were talking about.

Sincerely,
Allen

On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote:
>
> Hi Allen,
>
> What's the bottleneck?
> Have you tried enabling the experimental optimizations?
>
> On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie 
> > wrote:
>
>> Thank you Soumya, 
>>
>>I think Akka-streams is the way to go. However, I would also 
>> appreciate some performance boost as well - still have 40 million lines to 
>> go through! But thanks anyway!
>>
>>
>> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:
>>>
>>> I would recommend using the Akka-streams API for this. 
>>> Here is sample. I was able to process a 1G file with around 1.5 million 
>>> records in *20MB* of memory. The file read and the writing on the 
>>> console rates are different but the streams API handles that.  This is not 
>>> the fastest but you at least won't run out of memory. 
>>>
>>>
>>>
>>> 
>>>
>>> import java.io.FileInputStream
>>> import java.util.Scanner
>>>
>>> import akka.actor.ActorSystem
>>> import akka.stream.{FlowMaterializer, MaterializerSettings}
>>> import akka.stream.scaladsl.Source
>>>
>>> import scala.util.Try
>>>
>>>
>>> object StreamingFileReader extends App {
>>>
>>>
>>>   val inputStream = new FileInputStream("/path/to/file")
>>>   val sc = new Scanner(inputStream, "UTF-8")
>>>
>>>   implicit val system = ActorSystem("Sys")
>>>   val settings = MaterializerSettings(system)
>>>   implicit val materializer = 
>>> FlowMaterializer(settings.copy(maxInputBufferSize 
>>> = 256, initialInputBufferSize = 256))
>>>
>>>   val fileSource = Source(() => Iterator.continually(sc.nextLine()))
>>>
>>>   import system.dispatcher
>>>
>>>   fileSource.map { line =>
>>> line //do nothing
>>>   //in the for each print the line. 
>>>   }.foreach(println).onComplete { _ =>
>>> Try {
>>>   sc.close()
>>>   inputStream.close()
>>> }
>>> system.shutdown()
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>> On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote:

 Hi,

 I am trying to process a csv file with 40 million lines of data in 
 there. It's a 5GB size file. I'm trying to use Akka to parallelize the 
 task. However, it seems like I can't stop the quick memory growth. It 
 expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This 
 is 
 the code in my main() method:

 val inputStream = new 
 FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")val sc = new 
 Scanner(inputStream, "UTF-8")
 var counter = 0
 while (sc.hasNextLine) {

   rowActors(counter % 20) ! Row(sc.nextLine())

   counter += 1}

 sc.close()
 inputStream.close()

 Someone pointed out that I was essentially creating 40 million Row 
 objects, which naturally will take up a lot of space. My row actor is not 
 doing much. Just simply transforming each line into an array of integers 
 (if you are familiar with the concept of vectorizing, that's what I'm 
 doing). Then the transformed array gets printed out. Done. I originally 
 thought there was a memory leak but maybe I'm not managing memory right. 
 Can I get any wise suggestions from the Akka experts here??

 

 

  -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post t

Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?

2015-01-09 Thread Viktor Klang
Hi Allen,

What's the bottleneck?
Have you tried enabling the experimental optimizations?

On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie  wrote:

> Thank you Soumya,
>
>I think Akka-streams is the way to go. However, I would also
> appreciate some performance boost as well - still have 40 million lines to
> go through! But thanks anyway!
>
>
> On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote:
>>
>> I would recommend using the Akka-streams API for this.
>> Here is sample. I was able to process a 1G file with around 1.5 million
>> records in *20MB* of memory. The file read and the writing on the
>> console rates are different but the streams API handles that.  This is not
>> the fastest but you at least won't run out of memory.
>>
>>
>>
>> 
>>
>> import java.io.FileInputStream
>> import java.util.Scanner
>>
>> import akka.actor.ActorSystem
>> import akka.stream.{FlowMaterializer, MaterializerSettings}
>> import akka.stream.scaladsl.Source
>>
>> import scala.util.Try
>>
>>
>> object StreamingFileReader extends App {
>>
>>
>>   val inputStream = new FileInputStream("/path/to/file")
>>   val sc = new Scanner(inputStream, "UTF-8")
>>
>>   implicit val system = ActorSystem("Sys")
>>   val settings = MaterializerSettings(system)
>>   implicit val materializer = 
>> FlowMaterializer(settings.copy(maxInputBufferSize
>> = 256, initialInputBufferSize = 256))
>>
>>   val fileSource = Source(() => Iterator.continually(sc.nextLine()))
>>
>>   import system.dispatcher
>>
>>   fileSource.map { line =>
>> line //do nothing
>>   //in the for each print the line.
>>   }.foreach(println).onComplete { _ =>
>> Try {
>>   sc.close()
>>   inputStream.close()
>> }
>> system.shutdown()
>>   }
>> }
>>
>>
>>
>>
>> On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote:
>>>
>>> Hi,
>>>
>>> I am trying to process a csv file with 40 million lines of data in
>>> there. It's a 5GB size file. I'm trying to use Akka to parallelize the
>>> task. However, it seems like I can't stop the quick memory growth. It
>>> expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This is
>>> the code in my main() method:
>>>
>>> val inputStream = new 
>>> FileInputStream("E:\\Allen\\DataScience\\train\\train.csv")val sc = new 
>>> Scanner(inputStream, "UTF-8")
>>> var counter = 0
>>> while (sc.hasNextLine) {
>>>
>>>   rowActors(counter % 20) ! Row(sc.nextLine())
>>>
>>>   counter += 1}
>>>
>>> sc.close()
>>> inputStream.close()
>>>
>>> Someone pointed out that I was essentially creating 40 million Row
>>> objects, which naturally will take up a lot of space. My row actor is not
>>> doing much. Just simply transforming each line into an array of integers
>>> (if you are familiar with the concept of vectorizing, that's what I'm
>>> doing). Then the transformed array gets printed out. Done. I originally
>>> thought there was a memory leak but maybe I'm not managing memory right.
>>> Can I get any wise suggestions from the Akka experts here??
>>>
>>>
>>>
>>> 
>>>
>>>  --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.