I am trying to convert some json logs to Parquet and save them on S3.
In principle this is just

import org.apache.spark._
val sqlContext = new sql.SQLContext(sc)
val data = sqlContext.jsonFile(s3n://source/path/*/*",10e-8)
data.registerAsTable("data")
data.saveAsParquetFile("s3n://target/path)

This works fine for up to about a 10^9 records, but above that I start
having problems.
The first problem I encountered is that after the data file get written out
writing the Parquet summary file fails.
While I seem to have all the data saved out,
programs have a huge have a huge start up time
when processing parquet files without a summary file.

Writing  the summary file appears to primarily depend
on on the number of partitions being written,
and relatively independent of the amount of being written.
Problems start after about a 1000 partitions,
writing 10000 partitions fails even with repartitioned one days worth of
data.

My data is very finely partitioned, about 16 log files per hour, or 13K
files per month.
The file sizes are very uneven, ranging over several orders of magnitude.
There are several years of data.
By my calculations this will produce 10s of terabytes of Parquet files.

The first thing I tried to get around this problem
 was  passing the data through `coalesce(1000, shuffle=false)` before
writing.
This works up to about a month worth of data,
after that coalescing to 1000 partitions produces parquet files larger than
5G
and writing to S3 fails as a result.
Also coalescing slows processing down by at least a factor of 2.
I do not understand why this should happen since I use shuffle=false.
AFAIK coalesce should just be a bookkeeping trick and the original
partitions should be processed pretty much the same as before, just with
their outputs concatenated.

The only other option I can think of is to write each month coalesced
as a separate data set with its own summary file
and union the RDDs when processing the data,
but I do not know how much overhead that will introduce.

I am looking for advice on the best way to save this size data in Parquet
on S3.
Apart from solving the the summary file issue i am also looking for ways to
improve performance.
Would it make sense to write the data to a local hdfs first and push it to
S3 with `hadoop distcp`?
Is putting Tachyon in front of either the input or the output S3 likely to
help?
If yes which is likely to help more?

I set options on the master as follows

+++++++++++++++++++++++++++++++++++++++++
cat <<EOF >>~/spark/conf/spark-defaults.conf
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.rdd.compress      true
spark.shuffle.consolidateFiles  true
spark.akka.frameSize  20
EOF

copy-dir /root/spark/conf
spark/sbin/stop-all.sh
sleep 5
spark/sbin/start-all.
++++++++++++++++++++++++++++++++++++++++++

Does this make sense? Should I set some other options?
I have also asked these questions on StackOverflow where I reproduced the
full error messages:

+
http://stackoverflow.com/questions/26332542/how-to-save-a-multi-terabyte-schemardd-in-parquet-format-on-s3
+
http://stackoverflow.com/questions/26321947/multipart-uploads-to-amazon-s3-from-apache-spark
+
http://stackoverflow.com/questions/26291165/spark-sql-unable-to-complete-writing-parquet-data-with-a-large-number-of-shards

thanks
Daniel

Reply via email to