Is there a way to load a large file from HDFS faster into Spark
I've a Spark cluster with 3 worker nodes. - *Workers:* 3 - *Cores:* 48 Total, 48 Used - *Memory:* 469.8 GB Total, 72.0 GB Used I want a process a single file compressed (*.gz) on HDFS. The file is 1.5GB compressed and 11GB uncompressed. When I try to read the compressed file from HDFS it takes a while (4-5 minutes) load it into an RDD. If I use the .cache operation it takes even longer. Is there a way to make loading of the RDD from HDFS faster ? Thanks -Soumya
Re: How to read a multipart s3 file?
For example, this app just reads a 4GB file and writes a copy of it. It takes 41 seconds to write the file, then 3 more minutes to move all the temporary files. I guess this is an issue with the hadoop / jets3t code layer, not Spark. 14/05/06 20:11:41 INFO TaskSetManager: Finished TID 63 in 8688 ms on ip-10-143-138-33.ec2.internal (progress: 63/63) 14/05/06 20:11:41 INFO DAGScheduler: Stage 0 (saveAsTextFile at FileCopy.scala:17) finished in 41.326 s 14/05/06 20:11:41 INFO SparkContext: Job finished: saveAsTextFile at FileCopy.scala:17, took 41.605480454 s 14/05/06 20:14:48 INFO NativeS3FileSystem: OutputStream for key 'dad-20140101-9M.copy/_SUCCESS' writing to tempfile '/tmp/hadoop-root/s3/output-1223846975509014265.tmp' 14/05/06 20:14:48 INFO NativeS3FileSystem: OutputStream for key 'dad-20140101-9M.copy/_SUCCESS' closed. Now beginning upload 14/05/06 20:14:48 INFO NativeS3FileSystem: OutputStream for key 'dad-20140101-9M.copy/_SUCCESS' upload complete 14/05/06 20:14:48 INFO SparkDeploySchedulerBackend: Shutting down all executors -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p5473.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: os buffer cache does not cache shuffle output file
Seems the mailing list was broken when you sent your original question, so I appended it to the end of this message. "Buffers" is relatively unimportant in today's Linux kernel; "cache" is used for both writing and reading [1]. What you are seeing seems to be the expected behavior: the data is written to the page cache (increasing its size), and also written out asynchronously to the disk. As long as there's room in the page cache, the write should not block on IO. [1] http://stackoverflow.com/questions/6345020/linux-memory-buffer-vs-cache(contains better citations) """ Hi, patrick said "The intermediate shuffle output gets written to disk, but it often hits the OS-buffer cache since it's not explicitly fsync'ed, so in many cases it stays entirely in memory. The behavior of the shuffle is agnostic to whether the base RDD is in cache or in disk." i do a test with one groupBy action and found the intermediate shuffle files are written to disk with sufficient free memory, the shuffle size is about 500MB, and there 's 1.5GB free memory, and i notice that disk used increases about 500MB during the process. here's the log using vmstat, you can see the cache column increases when reading from disk, but buff column is unchanged, so the data written to disk is not buffered procs ---memory-- ---swap-- -io -system-- cpu r b swpd free buffcache si sobiboin cs us sy id wa 2 0 10256 1616852 6664 55734400 0 51380 972 2852 88 7 0 5 1 0 10256 1592636 6664 58067600 0 0 949 3777 91 9 0 0 1 0 10256 1568228 6672 60401600 0 576 923 3640 94 6 0 0 2 0 10256 1545836 6672 62734800 0 0 893 3261 95 5 0 0 1 0 10256 1521552 6672 65066800 0 0 884 3401 89 11 0 0 2 0 10256 1497144 6672 67401200 0 0 911 3275 91 9 0 0 1 0 10256 1469260 6676 70072800 4 60668 1044 3366 85 15 0 0 1 0 10256 1453076 6684 70246400 0 924 853 2596 97 3 0 0 is the buffer cache in write through mode? something i need to configure? my os is ubuntu 13.10 64bits. thanks! """ - wxhsdp On Sat, May 10, 2014 at 4:41 PM, Koert Kuipers wrote: > yes it seems broken. i got only a few emails in last few days > > > On Fri, May 9, 2014 at 7:24 AM, wxhsdp wrote: > >> is there something wrong with the mailing list? very few people see my >> thread >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478p5521.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >
Re: problem about broadcast variable in iteration
i run in spark 1.0.0, the newest under-development version. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p5480.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: os buffer cache does not cache shuffle output file
yes it seems broken. i got only a few emails in last few days On Fri, May 9, 2014 at 7:24 AM, wxhsdp wrote: > is there something wrong with the mailing list? very few people see my > thread > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478p5521.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
time exhausted in BlockFetcher
Hi, all i'am tuning my app in local mode, and found there was lots of time spent in local block fetch. in stage1: i read in input data, and do a repartition, in stage2: i do some operation on the repartitioned RDD, so it involves a local block fetch, i find that the fetch time is much longer than my operation time. here's my debug logs 14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 2, reduce 3 14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: BlockStoreShuffleFetcher startTime 1399767672970 14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 2, reduce 3 took 0 ms 14/05/11 08:21:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/05/11 08:21:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 13 non-empty blocks out of 24 blocks 14/05/11 08:21:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 2 ms 14/05/11 08:21:12 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: Got local block shuffle_2_0_3 14/05/11 08:21:12 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: Got local blocks in 6 ms ms 14/05/11 08:21:12 DEBUG BlockStoreShuffleFetcher: BlockStoreShuffleFetcher itr 1399767672982 14/05/11 08:21:12 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 1, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 2, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 3, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 4, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 5, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 6, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 7, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 8, next time 0 14/05/11 08:21:13 DEBUG BlockFetcherIterator$BasicBlockFetcherIterator: resultGotten 9, next time 0 14/05/11 08:21:13 DEBUG BlockStoreShuffleFetcher: completionIter shuffleFinishTime 1399767673293 in the end, blockFetcher calls CompletionIterator to read fetched blocks off the queue as they arrive. you can see before iterate the time is 1399767672982, iterate through the elements consume nearly no time, after the iterate completes, the time is 1399767673293, where does this 300+ms spend?! thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/time-exhausted-in-BlockFetcher-tp5540.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Schema view of HadoopRDD
Hi, For each line that we read as textLine from HDFS, we have a schema..if there is an API that takes the schema as List[Symbol] and maps each token to the Symbol it will be helpful... One solution is to keep data on hdfs as avro/protobuf serialized objects but not sure if that works on HBase input...we are testing HDFS right now but finally we will read from a persistent store like hbase...so basically the immutableBytes need to be converted to a schema view as well incase we don't want to write the whole row as a protobuf... Does RDDs provide a schema view of the dataset on HDFS / HBase ? Thanks. Deb
Re: os buffer cache does not cache shuffle output file
is there something wrong with the mailing list? very few people see my thread -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478p5521.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Creating time-sequential pairs
How about ... val data = sc.parallelize(Array((1,0.05),(2,0.10),(3,0.15))) val pairs = data.join(data.map(t => (t._1 + 1, t._2))) It's a self-join, but one copy has its ID incremented by 1. I don't know if it's performant but works, although output is more like: (2,(0.1,0.05)) (3,(0.15,0.1)) On Thu, May 8, 2014 at 11:04 PM, Nicholas Pritchard wrote: > Hi Spark community, > > I have a design/algorithm question that I assume is common enough for > someone else to have tackled before. I have an RDD of time-series data > formatted as time-value tuples, RDD[(Double, Double)], and am trying to > extract threshold crossings. In order to do so, I first want to transform > the RDD into pairs of time-sequential values. > > For example: > Input: The time-series data: > (1, 0.05) > (2, 0.10) > (3, 0.15) > Output: Transformed into time-sequential pairs: > ((1, 0.05), (2, 0.10)) > ((2, 0.10), (3, 0.15)) > > My initial thought was to try and utilize a custom partitioner. This > partitioner could ensure sequential data was kept together. Then I could use > "mapPartitions" to transform these lists of sequential data. Finally, I > would need some logic for creating sequential pairs across the boundaries of > each partition. > > However I was hoping to get some feedback and ideas from the community. > Anyone have thoughts on a simpler solution? > > Thanks, > Nick
Creating time-sequential pairs
Hi Spark community, I have a design/algorithm question that I assume is common enough for someone else to have tackled before. I have an RDD of time-series data formatted as time-value tuples, RDD[(Double, Double)], and am trying to extract threshold crossings. In order to do so, I first want to transform the RDD into pairs of time-sequential values. For example: Input: The time-series data: (1, 0.05) (2, 0.10) (3, 0.15) Output: Transformed into time-sequential pairs: ((1, 0.05), (2, 0.10)) ((2, 0.10), (3, 0.15)) My initial thought was to try and utilize a custom partitioner. This partitioner could ensure sequential data was kept together. Then I could use "mapPartitions" to transform these lists of sequential data. Finally, I would need some logic for creating sequential pairs across the boundaries of each partition. However I was hoping to get some feedback and ideas from the community. Anyone have thoughts on a simpler solution? Thanks, Nick
executor processes are still there even I killed the app and the workers
Hi, all With Spark 1.0 RC3, I found that the executor processes are still there even I killed the app and the workers? Any one found the same problem (maybe also exist in other versions)? Best, -- Nan Zhu