Hi Spark devs, First of all, huge congrats on the parquet integration with SparkSQL! This is an incredible direction forward and something I can see being very broadly useful.
I was doing some preliminary tests to see how it works with one of my workflows, and wanted to share some numbers that people might want to know about. I also wanted to point out that .count() doesn't seem integrated with the rest of the optimization framework, and some big gains could be possible. So, the numbers: I took a table extracted from a SQL database and stored in HDFS: - 115 columns (several always-empty, mostly strings, some enums, some numbers) - 253,887,080 rows - 182,150,295,881 bytes (raw uncompressed) - 42,826,820,222 bytes (lzo compressed with .index file) And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet() call: - Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42 cores across 4 machines - 17,517,922,117 bytes (parquet per SparkSQL defaults) So storing in parquet format vs lzo compresses the data down to less than 50% of the .lzo size, and under 10% of the raw uncompressed size. Nice! I then did some basic interactions on it: *Row count* - LZO - lzoFile("/path/to/lzo").count - 31.632305953s - Parquet - sqlContext.parquetFile("/path/to/parquet").count - 289.129487003s Reassembling rows from the separate column storage is clearly really expensive. Median task length is 33s vs 4s, and of that 33s in each task (319 tasks total) about 1.75 seconds are spent in GC (inefficient object allocation?) *Count number of rows with a particular key:* - LZO - lzoFile("/path/to/lzo").filter(_.split("\\|")(0) == "1234567890").count - 73.988897511s - Parquet - sqlContext.parquetFile("/path/to/parquet").where('COL === 1234567890).count - 293.410470418s - Parquet (hand-tuned to count on just one column) - sqlContext.parquetFile("/path/to/parquet").where('COL === 1234567890).select('IDCOL).count - 1.160449187s It looks like currently the .count() on parquet is handled incredibly inefficiently and all the columns are materialized. But if I select just that relevant column and then count, then the column-oriented storage of Parquet really shines. There ought to be a potential optimization here such that a .count() on a SchemaRDD backed by Parquet doesn't require re-assembling the rows, as that's expensive. I don't think .count() is handled specially in SchemaRDDs, but it seems ripe for optimization. *Count number of distinct values in a column* - LZO - lzoFile("/path/to/lzo").map(sel(0)).distinct.count - 115.582916866s - Parquet - sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count - 16.839004826 s It turns out column selectivity is very useful! I'm guessing that if I could get byte counts read out of HDFS, that would just about match up with the difference in read times. Any thoughts on how to embed the knowledge of my hand-tuned additional .select('IDCOL) into Catalyst? Thanks again for all the hard work and prep for the 1.0 release! Andrew