I am trying to convert some json logs to Parquet and save them on S3. In principle this is just
import org.apache.spark._ val sqlContext = new sql.SQLContext(sc) val data = sqlContext.jsonFile(s3n://source/path/*/*",10e-8) data.registerAsTable("data") data.saveAsParquetFile("s3n://target/path) This works fine for up to about a 10^9 records, but above that I start having problems. The first problem I encountered is that after the data file get written out writing the Parquet summary file fails. While I seem to have all the data saved out, programs have a huge have a huge start up time when processing parquet files without a summary file. Writing the summary file appears to primarily depend on on the number of partitions being written, and relatively independent of the amount of being written. Problems start after about a 1000 partitions, writing 10000 partitions fails even with repartitioned one days worth of data. My data is very finely partitioned, about 16 log files per hour, or 13K files per month. The file sizes are very uneven, ranging over several orders of magnitude. There are several years of data. By my calculations this will produce 10s of terabytes of Parquet files. The first thing I tried to get around this problem was passing the data through `coalesce(1000, shuffle=false)` before writing. This works up to about a month worth of data, after that coalescing to 1000 partitions produces parquet files larger than 5G and writing to S3 fails as a result. Also coalescing slows processing down by at least a factor of 2. I do not understand why this should happen since I use shuffle=false. AFAIK coalesce should just be a bookkeeping trick and the original partitions should be processed pretty much the same as before, just with their outputs concatenated. The only other option I can think of is to write each month coalesced as a separate data set with its own summary file and union the RDDs when processing the data, but I do not know how much overhead that will introduce. I am looking for advice on the best way to save this size data in Parquet on S3. Apart from solving the the summary file issue i am also looking for ways to improve performance. Would it make sense to write the data to a local hdfs first and push it to S3 with `hadoop distcp`? Is putting Tachyon in front of either the input or the output S3 likely to help? If yes which is likely to help more? I set options on the master as follows +++++++++++++++++++++++++++++++++++++++++ cat <<EOF >>~/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer spark.rdd.compress true spark.shuffle.consolidateFiles true spark.akka.frameSize 20 EOF copy-dir /root/spark/conf spark/sbin/stop-all.sh sleep 5 spark/sbin/start-all. ++++++++++++++++++++++++++++++++++++++++++ Does this make sense? Should I set some other options? I have also asked these questions on StackOverflow where I reproduced the full error messages: + http://stackoverflow.com/questions/26332542/how-to-save-a-multi-terabyte-schemardd-in-parquet-format-on-s3 + http://stackoverflow.com/questions/26321947/multipart-uploads-to-amazon-s3-from-apache-spark + http://stackoverflow.com/questions/26291165/spark-sql-unable-to-complete-writing-parquet-data-with-a-large-number-of-shards thanks Daniel