Daniel,

Currently, having Tachyon will at least help on the input part in this case.

Haoyuan

On Fri, Oct 24, 2014 at 2:01 PM, Daniel Mahler <dmah...@gmail.com> wrote:

> I am trying to convert some json logs to Parquet and save them on S3.
> In principle this is just
>
> import org.apache.spark._
> val sqlContext = new sql.SQLContext(sc)
> val data = sqlContext.jsonFile(s3n://source/path/*/*",10e-8)
> data.registerAsTable("data")
> data.saveAsParquetFile("s3n://target/path)
>
> This works fine for up to about a 10^9 records, but above that I start
> having problems.
> The first problem I encountered is that after the data file get written out
> writing the Parquet summary file fails.
> While I seem to have all the data saved out,
> programs have a huge have a huge start up time
> when processing parquet files without a summary file.
>
> Writing  the summary file appears to primarily depend
> on on the number of partitions being written,
> and relatively independent of the amount of being written.
> Problems start after about a 1000 partitions,
> writing 10000 partitions fails even with repartitioned one days worth of
> data.
>
> My data is very finely partitioned, about 16 log files per hour, or 13K
> files per month.
> The file sizes are very uneven, ranging over several orders of magnitude.
> There are several years of data.
> By my calculations this will produce 10s of terabytes of Parquet files.
>
> The first thing I tried to get around this problem
>  was  passing the data through `coalesce(1000, shuffle=false)` before
> writing.
> This works up to about a month worth of data,
> after that coalescing to 1000 partitions produces parquet files larger
> than 5G
> and writing to S3 fails as a result.
> Also coalescing slows processing down by at least a factor of 2.
> I do not understand why this should happen since I use shuffle=false.
> AFAIK coalesce should just be a bookkeeping trick and the original
> partitions should be processed pretty much the same as before, just with
> their outputs concatenated.
>
> The only other option I can think of is to write each month coalesced
> as a separate data set with its own summary file
> and union the RDDs when processing the data,
> but I do not know how much overhead that will introduce.
>
> I am looking for advice on the best way to save this size data in Parquet
> on S3.
> Apart from solving the the summary file issue i am also looking for ways
> to improve performance.
> Would it make sense to write the data to a local hdfs first and push it to
> S3 with `hadoop distcp`?
> Is putting Tachyon in front of either the input or the output S3 likely to
> help?
> If yes which is likely to help more?
>
> I set options on the master as follows
>
> +++++++++++++++++++++++++++++++++++++++++
> cat <<EOF >>~/spark/conf/spark-defaults.conf
> spark.serializer        org.apache.spark.serializer.KryoSerializer
> spark.rdd.compress      true
> spark.shuffle.consolidateFiles  true
> spark.akka.frameSize  20
> EOF
>
> copy-dir /root/spark/conf
> spark/sbin/stop-all.sh
> sleep 5
> spark/sbin/start-all.
> ++++++++++++++++++++++++++++++++++++++++++
>
> Does this make sense? Should I set some other options?
> I have also asked these questions on StackOverflow where I reproduced the
> full error messages:
>
> +
> http://stackoverflow.com/questions/26332542/how-to-save-a-multi-terabyte-schemardd-in-parquet-format-on-s3
> +
> http://stackoverflow.com/questions/26321947/multipart-uploads-to-amazon-s3-from-apache-spark
> +
> http://stackoverflow.com/questions/26291165/spark-sql-unable-to-complete-writing-parquet-data-with-a-large-number-of-shards
>
> thanks
> Daniel
>
>
>


-- 
Haoyuan Li
AMPLab, EECS, UC Berkeley
http://www.cs.berkeley.edu/~haoyuan/

Reply via email to