Daniel, Currently, having Tachyon will at least help on the input part in this case.
Haoyuan On Fri, Oct 24, 2014 at 2:01 PM, Daniel Mahler <dmah...@gmail.com> wrote: > I am trying to convert some json logs to Parquet and save them on S3. > In principle this is just > > import org.apache.spark._ > val sqlContext = new sql.SQLContext(sc) > val data = sqlContext.jsonFile(s3n://source/path/*/*",10e-8) > data.registerAsTable("data") > data.saveAsParquetFile("s3n://target/path) > > This works fine for up to about a 10^9 records, but above that I start > having problems. > The first problem I encountered is that after the data file get written out > writing the Parquet summary file fails. > While I seem to have all the data saved out, > programs have a huge have a huge start up time > when processing parquet files without a summary file. > > Writing the summary file appears to primarily depend > on on the number of partitions being written, > and relatively independent of the amount of being written. > Problems start after about a 1000 partitions, > writing 10000 partitions fails even with repartitioned one days worth of > data. > > My data is very finely partitioned, about 16 log files per hour, or 13K > files per month. > The file sizes are very uneven, ranging over several orders of magnitude. > There are several years of data. > By my calculations this will produce 10s of terabytes of Parquet files. > > The first thing I tried to get around this problem > was passing the data through `coalesce(1000, shuffle=false)` before > writing. > This works up to about a month worth of data, > after that coalescing to 1000 partitions produces parquet files larger > than 5G > and writing to S3 fails as a result. > Also coalescing slows processing down by at least a factor of 2. > I do not understand why this should happen since I use shuffle=false. > AFAIK coalesce should just be a bookkeeping trick and the original > partitions should be processed pretty much the same as before, just with > their outputs concatenated. > > The only other option I can think of is to write each month coalesced > as a separate data set with its own summary file > and union the RDDs when processing the data, > but I do not know how much overhead that will introduce. > > I am looking for advice on the best way to save this size data in Parquet > on S3. > Apart from solving the the summary file issue i am also looking for ways > to improve performance. > Would it make sense to write the data to a local hdfs first and push it to > S3 with `hadoop distcp`? > Is putting Tachyon in front of either the input or the output S3 likely to > help? > If yes which is likely to help more? > > I set options on the master as follows > > +++++++++++++++++++++++++++++++++++++++++ > cat <<EOF >>~/spark/conf/spark-defaults.conf > spark.serializer org.apache.spark.serializer.KryoSerializer > spark.rdd.compress true > spark.shuffle.consolidateFiles true > spark.akka.frameSize 20 > EOF > > copy-dir /root/spark/conf > spark/sbin/stop-all.sh > sleep 5 > spark/sbin/start-all. > ++++++++++++++++++++++++++++++++++++++++++ > > Does this make sense? Should I set some other options? > I have also asked these questions on StackOverflow where I reproduced the > full error messages: > > + > http://stackoverflow.com/questions/26332542/how-to-save-a-multi-terabyte-schemardd-in-parquet-format-on-s3 > + > http://stackoverflow.com/questions/26321947/multipart-uploads-to-amazon-s3-from-apache-spark > + > http://stackoverflow.com/questions/26291165/spark-sql-unable-to-complete-writing-parquet-data-with-a-large-number-of-shards > > thanks > Daniel > > > -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/