Preliminary Parquet numbers and including .count() in Catalyst

2014-05-12 Thread Andrew Ash
Hi Spark devs,

First of all, huge congrats on the parquet integration with SparkSQL!  This
is an incredible direction forward and something I can see being very
broadly useful.

I was doing some preliminary tests to see how it works with one of my
workflows, and wanted to share some numbers that people might want to know
about.

I also wanted to point out that .count() doesn't seem integrated with the
rest of the optimization framework, and some big gains could be possible.


So, the numbers:

I took a table extracted from a SQL database and stored in HDFS:

   - 115 columns (several always-empty, mostly strings, some enums, some
   numbers)
   - 253,887,080 rows
   - 182,150,295,881 bytes (raw uncompressed)
   - 42,826,820,222 bytes (lzo compressed with .index file)

And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet()
call:

   - Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42
   cores across 4 machines
   - 17,517,922,117 bytes (parquet per SparkSQL defaults)

So storing in parquet format vs lzo compresses the data down to less than
50% of the .lzo size, and under 10% of the raw uncompressed size.  Nice!


I then did some basic interactions on it:

*Row count*

   - LZO
  - lzoFile("/path/to/lzo").count
  - 31.632305953s
   - Parquet
  - sqlContext.parquetFile("/path/to/parquet").count
  - 289.129487003s

Reassembling rows from the separate column storage is clearly really
expensive.  Median task length is 33s vs 4s, and of that 33s in each task
(319 tasks total) about 1.75 seconds are spent in GC (inefficient object
allocation?)



*Count number of rows with a particular key:*

   - LZO
   - lzoFile("/path/to/lzo").filter(_.split("\\|")(0) == "1234567890").count
  - 73.988897511s
   - Parquet
   - sqlContext.parquetFile("/path/to/parquet").where('COL ===
  1234567890).count
  - 293.410470418s
   - Parquet (hand-tuned to count on just one column)
   - sqlContext.parquetFile("/path/to/parquet").where('COL ===
  1234567890).select('IDCOL).count
  - 1.160449187s

It looks like currently the .count() on parquet is handled incredibly
inefficiently and all the columns are materialized.  But if I select just
that relevant column and then count, then the column-oriented storage of
Parquet really shines.

There ought to be a potential optimization here such that a .count() on a
SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
that's expensive.  I don't think .count() is handled specially in
SchemaRDDs, but it seems ripe for optimization.


*Count number of distinct values in a column*

   - LZO
   - lzoFile("/path/to/lzo").map(sel(0)).distinct.count
  - 115.582916866s
   - Parquet
   - sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count
  - 16.839004826 s

It turns out column selectivity is very useful!  I'm guessing that if I
could get byte counts read out of HDFS, that would just about match up with
the difference in read times.




Any thoughts on how to embed the knowledge of my hand-tuned additional
.select('IDCOL)
into Catalyst?


Thanks again for all the hard work and prep for the 1.0 release!

Andrew


Re: Preliminary Parquet numbers and including .count() in Catalyst

2014-05-12 Thread Reynold Xin
Thanks for the experiments and analysis!

I think Michael already submitted a patch that avoids scanning all columns
for count(*) or count(1).


On Mon, May 12, 2014 at 9:46 PM, Andrew Ash  wrote:

> Hi Spark devs,
>
> First of all, huge congrats on the parquet integration with SparkSQL!  This
> is an incredible direction forward and something I can see being very
> broadly useful.
>
> I was doing some preliminary tests to see how it works with one of my
> workflows, and wanted to share some numbers that people might want to know
> about.
>
> I also wanted to point out that .count() doesn't seem integrated with the
> rest of the optimization framework, and some big gains could be possible.
>
>
> So, the numbers:
>
> I took a table extracted from a SQL database and stored in HDFS:
>
>- 115 columns (several always-empty, mostly strings, some enums, some
>numbers)
>- 253,887,080 rows
>- 182,150,295,881 bytes (raw uncompressed)
>- 42,826,820,222 bytes (lzo compressed with .index file)
>
> And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet()
> call:
>
>- Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42
>cores across 4 machines
>- 17,517,922,117 bytes (parquet per SparkSQL defaults)
>
> So storing in parquet format vs lzo compresses the data down to less than
> 50% of the .lzo size, and under 10% of the raw uncompressed size.  Nice!
>
>
> I then did some basic interactions on it:
>
> *Row count*
>
>- LZO
>   - lzoFile("/path/to/lzo").count
>   - 31.632305953s
>- Parquet
>   - sqlContext.parquetFile("/path/to/parquet").count
>   - 289.129487003s
>
> Reassembling rows from the separate column storage is clearly really
> expensive.  Median task length is 33s vs 4s, and of that 33s in each task
> (319 tasks total) about 1.75 seconds are spent in GC (inefficient object
> allocation?)
>
>
>
> *Count number of rows with a particular key:*
>
>- LZO
>- lzoFile("/path/to/lzo").filter(_.split("\\|")(0) ==
> "1234567890").count
>   - 73.988897511s
>- Parquet
>- sqlContext.parquetFile("/path/to/parquet").where('COL ===
>   1234567890).count
>   - 293.410470418s
>- Parquet (hand-tuned to count on just one column)
>- sqlContext.parquetFile("/path/to/parquet").where('COL ===
>   1234567890).select('IDCOL).count
>   - 1.160449187s
>
> It looks like currently the .count() on parquet is handled incredibly
> inefficiently and all the columns are materialized.  But if I select just
> that relevant column and then count, then the column-oriented storage of
> Parquet really shines.
>
> There ought to be a potential optimization here such that a .count() on a
> SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
> that's expensive.  I don't think .count() is handled specially in
> SchemaRDDs, but it seems ripe for optimization.
>
>
> *Count number of distinct values in a column*
>
>- LZO
>- lzoFile("/path/to/lzo").map(sel(0)).distinct.count
>   - 115.582916866s
>- Parquet
>- sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count
>   - 16.839004826 s
>
> It turns out column selectivity is very useful!  I'm guessing that if I
> could get byte counts read out of HDFS, that would just about match up with
> the difference in read times.
>
>
>
>
> Any thoughts on how to embed the knowledge of my hand-tuned additional
> .select('IDCOL)
> into Catalyst?
>
>
> Thanks again for all the hard work and prep for the 1.0 release!
>
> Andrew
>


Re: Preliminary Parquet numbers and including .count() in Catalyst

2014-05-13 Thread Andrew Ash
These numbers were run on git commit 756c96 (a few days after the 1.0.0-rc3
tag).  Do you have a link to the patch that avoids scanning all columns for
count(*) or count(1)?  I'd like to give it a shot.

Andrew


On Mon, May 12, 2014 at 11:41 PM, Reynold Xin  wrote:

> Thanks for the experiments and analysis!
>
> I think Michael already submitted a patch that avoids scanning all columns
> for count(*) or count(1).
>
>
> On Mon, May 12, 2014 at 9:46 PM, Andrew Ash  wrote:
>
> > Hi Spark devs,
> >
> > First of all, huge congrats on the parquet integration with SparkSQL!
>  This
> > is an incredible direction forward and something I can see being very
> > broadly useful.
> >
> > I was doing some preliminary tests to see how it works with one of my
> > workflows, and wanted to share some numbers that people might want to
> know
> > about.
> >
> > I also wanted to point out that .count() doesn't seem integrated with the
> > rest of the optimization framework, and some big gains could be possible.
> >
> >
> > So, the numbers:
> >
> > I took a table extracted from a SQL database and stored in HDFS:
> >
> >- 115 columns (several always-empty, mostly strings, some enums, some
> >numbers)
> >- 253,887,080 rows
> >- 182,150,295,881 bytes (raw uncompressed)
> >- 42,826,820,222 bytes (lzo compressed with .index file)
> >
> > And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet()
> > call:
> >
> >- Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42
> >cores across 4 machines
> >- 17,517,922,117 bytes (parquet per SparkSQL defaults)
> >
> > So storing in parquet format vs lzo compresses the data down to less than
> > 50% of the .lzo size, and under 10% of the raw uncompressed size.  Nice!
> >
> >
> > I then did some basic interactions on it:
> >
> > *Row count*
> >
> >- LZO
> >   - lzoFile("/path/to/lzo").count
> >   - 31.632305953s
> >- Parquet
> >   - sqlContext.parquetFile("/path/to/parquet").count
> >   - 289.129487003s
> >
> > Reassembling rows from the separate column storage is clearly really
> > expensive.  Median task length is 33s vs 4s, and of that 33s in each task
> > (319 tasks total) about 1.75 seconds are spent in GC (inefficient object
> > allocation?)
> >
> >
> >
> > *Count number of rows with a particular key:*
> >
> >- LZO
> >- lzoFile("/path/to/lzo").filter(_.split("\\|")(0) ==
> > "1234567890").count
> >   - 73.988897511s
> >- Parquet
> >- sqlContext.parquetFile("/path/to/parquet").where('COL ===
> >   1234567890).count
> >   - 293.410470418s
> >- Parquet (hand-tuned to count on just one column)
> >- sqlContext.parquetFile("/path/to/parquet").where('COL ===
> >   1234567890).select('IDCOL).count
> >   - 1.160449187s
> >
> > It looks like currently the .count() on parquet is handled incredibly
> > inefficiently and all the columns are materialized.  But if I select just
> > that relevant column and then count, then the column-oriented storage of
> > Parquet really shines.
> >
> > There ought to be a potential optimization here such that a .count() on a
> > SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
> > that's expensive.  I don't think .count() is handled specially in
> > SchemaRDDs, but it seems ripe for optimization.
> >
> >
> > *Count number of distinct values in a column*
> >
> >- LZO
> >- lzoFile("/path/to/lzo").map(sel(0)).distinct.count
> >   - 115.582916866s
> >- Parquet
> >-
> sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count
> >   - 16.839004826 s
> >
> > It turns out column selectivity is very useful!  I'm guessing that if I
> > could get byte counts read out of HDFS, that would just about match up
> with
> > the difference in read times.
> >
> >
> >
> >
> > Any thoughts on how to embed the knowledge of my hand-tuned additional
> > .select('IDCOL)
> > into Catalyst?
> >
> >
> > Thanks again for all the hard work and prep for the 1.0 release!
> >
> > Andrew
> >
>


Re: Preliminary Parquet numbers and including .count() in Catalyst

2014-05-13 Thread Michael Armbrust
>
> It looks like currently the .count() on parquet is handled incredibly
> inefficiently and all the columns are materialized.  But if I select just
> that relevant column and then count, then the column-oriented storage of
> Parquet really shines.
>
> There ought to be a potential optimization here such that a .count() on a
> SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
> that's expensive.  I don't think .count() is handled specially in
> SchemaRDDs, but it seems ripe for optimization.
>

Yeah, you are right.  Thanks for pointing this out!

If you call .count() that is just the native Spark count, which is not
aware of the potential optimizations.  We could just override count() in a
schema RDD to be something like
"groupBy()(Count(Literal(1))).collect().head.getInt(0)"

Here is a JIRA: SPARK-1822 - SchemaRDD.count() should use the
optimizer.

Michael


Re: Preliminary Parquet numbers and including .count() in Catalyst

2014-05-13 Thread Andrew Ash
Thanks for filing -- I'm keeping my eye out for updates on that ticket.

Cheers!
Andrew


On Tue, May 13, 2014 at 2:40 PM, Michael Armbrust wrote:

> >
> > It looks like currently the .count() on parquet is handled incredibly
> > inefficiently and all the columns are materialized.  But if I select just
> > that relevant column and then count, then the column-oriented storage of
> > Parquet really shines.
> >
> > There ought to be a potential optimization here such that a .count() on a
> > SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
> > that's expensive.  I don't think .count() is handled specially in
> > SchemaRDDs, but it seems ripe for optimization.
> >
>
> Yeah, you are right.  Thanks for pointing this out!
>
> If you call .count() that is just the native Spark count, which is not
> aware of the potential optimizations.  We could just override count() in a
> schema RDD to be something like
> "groupBy()(Count(Literal(1))).collect().head.getInt(0)"
>
> Here is a JIRA: SPARK-1822 - SchemaRDD.count() should use the
> optimizer.
>
> Michael
>