Is there a way to load a large file from HDFS faster into Spark

2014-05-10 Thread Soumya Simanta
I've a Spark cluster with 3 worker nodes.


   - *Workers:* 3
   - *Cores:* 48 Total, 48 Used
   - *Memory:* 469.8 GB Total, 72.0 GB Used

I want a process a single file compressed (*.gz) on HDFS. The file is 1.5GB
compressed and 11GB uncompressed.
When I try to read the compressed file from HDFS it takes a while (4-5
minutes) load it into an RDD. If I use the .cache operation it takes even
longer. Is there a way to make loading of the RDD from HDFS faster ?

Thanks
-Soumya


Re: How to read a multipart s3 file?

2014-05-10 Thread kamatsuoka
For example, this app just reads a 4GB file and writes a copy of it.  It
takes 41 seconds to write the file, then 3 more minutes to move all the
temporary files.

I guess this is an issue with the hadoop / jets3t code layer, not Spark.

14/05/06 20:11:41 INFO TaskSetManager: Finished TID 63 in 8688 ms on
ip-10-143-138-33.ec2.internal (progress: 63/63)
14/05/06 20:11:41 INFO DAGScheduler: Stage 0 (saveAsTextFile at
FileCopy.scala:17) finished in 41.326 s
14/05/06 20:11:41 INFO SparkContext: Job finished: saveAsTextFile at
FileCopy.scala:17, took 41.605480454 s
14/05/06 20:14:48 INFO NativeS3FileSystem: OutputStream for key
'dad-20140101-9M.copy/_SUCCESS' writing to tempfile
'/tmp/hadoop-root/s3/output-1223846975509014265.tmp'
14/05/06 20:14:48 INFO NativeS3FileSystem: OutputStream for key
'dad-20140101-9M.copy/_SUCCESS' closed. Now beginning upload
14/05/06 20:14:48 INFO NativeS3FileSystem: OutputStream for key
'dad-20140101-9M.copy/_SUCCESS' upload complete
14/05/06 20:14:48 INFO SparkDeploySchedulerBackend: Shutting down all
executors



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p5473.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: os buffer cache does not cache shuffle output file

2014-05-10 Thread Aaron Davidson
Seems the mailing list was broken when you sent your original question, so
I appended it to the end of this message.

"Buffers" is relatively unimportant in today's Linux kernel; "cache" is
used for both writing and reading [1].
What you are seeing seems to be the expected behavior: the data is written
to the page cache (increasing its size),
and also written out asynchronously to the disk. As long as there's room in
the page cache, the write should not
block on IO.

[1] 
http://stackoverflow.com/questions/6345020/linux-memory-buffer-vs-cache(contains
better citations)

"""
Hi,
  patrick said "The intermediate shuffle output gets written to disk, but
it often hits the OS-buffer cache
  since it's not explicitly fsync'ed, so in many cases it stays entirely in
memory. The behavior of the
  shuffle is agnostic to whether the base RDD is in cache or in disk."

  i do a test with one groupBy action and found the intermediate shuffle
files are written to disk
  with sufficient free memory, the shuffle size is about 500MB, and there
's 1.5GB free memory,
  and i notice that disk used increases about 500MB during the process.

  here's the log using vmstat, you can see the cache column increases when
reading from disk, but
  buff column is unchanged, so the data written to disk is not buffered

procs ---memory-- ---swap-- -io -system--
cpu
 r  b   swpd   free buffcache  si   sobiboin
 cs us sy id wa
 2  0  10256 1616852   6664 55734400 0 51380  972  2852 88  7
 0  5
 1  0  10256 1592636   6664 58067600 0 0 949  3777 91
 9  0  0
 1  0  10256 1568228   6672 60401600 0   576   923  3640 94  6
 0  0
 2  0  10256 1545836   6672 62734800 0 0 893  3261 95
 5  0  0
 1  0  10256 1521552   6672 65066800 0 0 884  3401 89
11  0  0
 2  0  10256 1497144   6672 67401200 0 0 911  3275 91
 9  0  0
 1  0  10256 1469260   6676 70072800 4 60668 1044 3366 85 15  0
 0
 1  0  10256 1453076   6684 70246400 0   924   853 2596 97  3
 0  0

  is the buffer cache in write through mode? something i need to configure?
  my os is ubuntu 13.10 64bits.
  thanks!
"""
- wxhsdp


On Sat, May 10, 2014 at 4:41 PM, Koert Kuipers  wrote:

> yes it seems broken. i got only a few emails in last few days
>
>
> On Fri, May 9, 2014 at 7:24 AM, wxhsdp  wrote:
>
>> is there something wrong with the mailing list? very few people see my
>> thread
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478p5521.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: problem about broadcast variable in iteration

2014-05-10 Thread randylu
i run in spark 1.0.0, the newest under-development version.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p5480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: os buffer cache does not cache shuffle output file

2014-05-10 Thread Koert Kuipers
yes it seems broken. i got only a few emails in last few days


On Fri, May 9, 2014 at 7:24 AM, wxhsdp  wrote:

> is there something wrong with the mailing list? very few people see my
> thread
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478p5521.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


time exhausted in BlockFetcher

2014-05-10 Thread wxhsdp
Hi, all
  i'am tuning my app in local mode, and found there was lots of time spent
in local block fetch.

  in stage1: i read in input data, and do a repartition, 
  in stage2: i do some operation on the repartitioned RDD, so it involves a
local block fetch, i find that
  the fetch time is much longer than my operation time.

here's my debug logs
14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: Fetching outputs for
shuffle 2, reduce 3
14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: BlockStoreShuffleFetcher
startTime 1399767672970
14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: Fetching map output
location for shuffle 2, reduce 3 took 0 ms
14/05/11 08:21:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/05/11 08:21:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 13 non-empty blocks out of 24 blocks
14/05/11 08:21:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote fetches in 2 ms
14/05/11 08:21:12 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: Got
local block shuffle_2_0_3
14/05/11 08:21:12 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: Got
local blocks in  6 ms ms
14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: BlockStoreShuffleFetcher
itr 1399767672982
14/05/11 08:21:12 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 1, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 2, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 3, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 4, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 5, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 6, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 7, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 8, next time 0
14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator:
resultGotten 9, next time 0
14/05/11 08:21:13 DEBUG BlockStoreShuffleFetcher: completionIter
shuffleFinishTime 1399767673293

  in the end, blockFetcher calls CompletionIterator to read fetched blocks
off the queue as they arrive.
  you can see before iterate the time is 1399767672982, iterate through the
elements consume nearly
  no time, after the iterate completes, the time is 1399767673293, where
does this 300+ms spend?!

  thank you very much!


   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/time-exhausted-in-BlockFetcher-tp5540.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Schema view of HadoopRDD

2014-05-10 Thread Debasish Das
Hi,

For each line that we read as textLine from HDFS, we have a schema..if
there is an API that takes the schema as List[Symbol] and maps each token
to the Symbol it will be helpful...

One solution is to keep data on hdfs as avro/protobuf serialized objects
but not sure if that works on HBase input...we are testing HDFS right now
but finally we will read from a persistent store like hbase...so basically
the immutableBytes need to be converted to a schema view as well incase we
don't want to write the whole row as a protobuf...

Does RDDs provide a schema view of the dataset on HDFS / HBase ?

Thanks.
Deb


Re: os buffer cache does not cache shuffle output file

2014-05-10 Thread wxhsdp
is there something wrong with the mailing list? very few people see my thread



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478p5521.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Creating time-sequential pairs

2014-05-10 Thread Sean Owen
How about ...

val data = sc.parallelize(Array((1,0.05),(2,0.10),(3,0.15)))
val pairs = data.join(data.map(t => (t._1 + 1, t._2)))

It's a self-join, but one copy has its ID incremented by 1. I don't
know if it's performant but works, although output is more like:

(2,(0.1,0.05))
(3,(0.15,0.1))

On Thu, May 8, 2014 at 11:04 PM, Nicholas Pritchard
 wrote:
> Hi Spark community,
>
> I have a design/algorithm question that I assume is common enough for
> someone else to have tackled before. I have an RDD of time-series data
> formatted as time-value tuples, RDD[(Double, Double)], and am trying to
> extract threshold crossings. In order to do so, I first want to transform
> the RDD into pairs of time-sequential values.
>
> For example:
> Input: The time-series data:
> (1, 0.05)
> (2, 0.10)
> (3, 0.15)
> Output: Transformed into time-sequential pairs:
> ((1, 0.05), (2, 0.10))
> ((2, 0.10), (3, 0.15))
>
> My initial thought was to try and utilize a custom partitioner. This
> partitioner could ensure sequential data was kept together. Then I could use
> "mapPartitions" to transform these lists of sequential data. Finally, I
> would need some logic for creating sequential pairs across the boundaries of
> each partition.
>
> However I was hoping to get some feedback and ideas from the community.
> Anyone have thoughts on a simpler solution?
>
> Thanks,
> Nick


Creating time-sequential pairs

2014-05-10 Thread Nicholas Pritchard
Hi Spark community,

I have a design/algorithm question that I assume is common enough for
someone else to have tackled before. I have an RDD of time-series data
formatted as time-value tuples, RDD[(Double, Double)], and am trying to
extract threshold crossings. In order to do so, I first want to transform
the RDD into pairs of time-sequential values.

For example:
Input: The time-series data:
(1, 0.05)
(2, 0.10)
(3, 0.15)
Output: Transformed into time-sequential pairs:
((1, 0.05), (2, 0.10))
((2, 0.10), (3, 0.15))

My initial thought was to try and utilize a custom partitioner. This
partitioner could ensure sequential data was kept together. Then I could
use "mapPartitions" to transform these lists of sequential data. Finally, I
would need some logic for creating sequential pairs across the boundaries
of each partition.

However I was hoping to get some feedback and ideas from the community.
Anyone have thoughts on a simpler solution?

Thanks,
Nick


executor processes are still there even I killed the app and the workers

2014-05-10 Thread Nan Zhu
Hi, all 

With Spark 1.0 RC3, I found that the executor processes are still there even I 
killed the app and the workers?

Any one found the same problem (maybe also exist in other versions)?

Best, 

-- 
Nan Zhu