Re: Performance of loading parquet files into case classes in Spark

2016-08-30 Thread Steve Loughran

On 29 Aug 2016, at 20:58, Julien Dumazert 
> wrote:

Hi Maciek,

I followed your recommandation and benchmarked Dataframes aggregations on 
Dataset. Here is what I got:


// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)

// Dataset with map and Dataframes sum
// 35.372s


df.as[A].map(_.fieldToSum).agg(sum("value")).collect().head.getAs[Long](0)

Not much of a difference. It seems that as soon as you access data as in RDDs, 
you force the full decoding of the object into a case class, which is super 
costly.

I find this behavior quite normal: as soon as you provide the user with the 
ability to pass a blackbox function, anything can happen, so you have to load 
the whole object. On the other hand, when using SQL-style functions only, 
everything is "white box", so Spark understands what you want to do and can 
optimize.



SWL and the dataframe code where you are asking for a specific field can be 
handled by the file format itself, so optimising the operation. If you ask for 
only one column of Parquet and orc data, then only that column's data should be 
loaded. And because they store columns together, you save on all the IO needed 
to read all the discarded columns. Add even more selectiveness (such as ranges 
in values), then you can even get "predicate pushdown" where blocks of the file 
are skipped if the input format reader can determine that none of the columns 
there match the predicate's conditions.

you should be able to ge away with something like df.select("field") to 
filter out the fields you want first, then stay in code rather than SQL.

Anyway, experiment: its always more accurate than the opinions of others, 
especially when applied to your own datasets.


Re: Performance of loading parquet files into case classes in Spark

2016-08-29 Thread Julien Dumazert
Hi Maciek,

I followed your recommandation and benchmarked Dataframes aggregations on 
Dataset. Here is what I got:

// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)
// Dataset with map and Dataframes sum
// 35.372s
df.as[A].map(_.fieldToSum).agg(sum("value")).collect().head.getAs[Long](0)

Not much of a difference. It seems that as soon as you access data as in RDDs, 
you force the full decoding of the object into a case class, which is super 
costly.

I find this behavior quite normal: as soon as you provide the user with the 
ability to pass a blackbox function, anything can happen, so you have to load 
the whole object. On the other hand, when using SQL-style functions only, 
everything is "white box", so Spark understands what you want to do and can 
optimize.

Still, it breaks the promise of Datasets to me, and I hope there is something 
to do here (not confident on this point), and that it will be addressed in a 
later release.

Best regards,
Julien


> Le 28 août 2016 à 22:12, Maciej Bryński  a écrit :
> 
> Hi Julien,
> I thought about something like this:
> import org.apache.spark.sql.functions.sum
> df.as[A].map(_.fieldToSum).agg(sum("value")).collect()
> To try using Dataframes aggregation on Dataset instead of reduce.
> 
> Regards,
> Maciek
> 
> 2016-08-28 21:27 GMT+02:00 Julien Dumazert  >:
> Hi Maciek,
> 
> I've tested several variants for summing "fieldToSum":
> 
> First, RDD-style code:
> df.as[A].map(_.fieldToSum).reduce(_ + _)
> df.as[A].rdd.map(_.fieldToSum).sum()
> df.as[A].map(_.fieldToSum).rdd.sum()
> All around 30 seconds. "reduce" and "sum" seem to have the same performance, 
> for this use case at least.
> 
> Then with sql.functions.sum:
> df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)
> 0.24 seconds, super fast.
> 
> Finally, dataset with column selection:
> df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)
> 0.18 seconds, super fast again.
> 
> (I've also tried replacing my sums and reduces by counts on your advice, but 
> the performance is unchanged. Apparently, summing does not take much time 
> compared to accessing data.)
> 
> It seems that we need to use the SQL interface to reach the highest level of 
> performance, which somehow breaks the promise of Dataset (preserving type 
> safety and having Catalyst and Tungsten performance like datasets).
> 
> As for direct access to Row, it seems that it got much slower from 1.6 to 
> 2.0. I guess, it's because of the fact that Dataframe is now Dataset[Row], 
> and thus uses the same encoding/decoding mechanism as for any other case 
> class.
> 
> Best regards,
> 
> Julien
> 
>> Le 27 août 2016 à 22:32, Maciej Bryński > > a écrit :
>> 
>> 
>> 2016-08-27 15:27 GMT+02:00 Julien Dumazert > >:
>> df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
>> 
>> I think reduce and sum has very different performance. 
>> Did you try sql.functions.sum ?
>> Or of you want to benchmark access to Row object then  count() function will 
>> be better idea.
>> 
>> Regards,
>> -- 
>> Maciek Bryński
> 
> 
> 
> 
> -- 
> Maciek Bryński



Re: Performance of loading parquet files into case classes in Spark

2016-08-28 Thread Julien Dumazert
Hi Maciek,

I've tested several variants for summing "fieldToSum":

First, RDD-style code:
df.as[A].map(_.fieldToSum).reduce(_ + _)
df.as[A].rdd.map(_.fieldToSum).sum()
df.as[A].map(_.fieldToSum).rdd.sum()
All around 30 seconds. "reduce" and "sum" seem to have the same performance, 
for this use case at least.

Then with sql.functions.sum:
df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)
0.24 seconds, super fast.

Finally, dataset with column selection:
df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)
0.18 seconds, super fast again.

(I've also tried replacing my sums and reduces by counts on your advice, but 
the performance is unchanged. Apparently, summing does not take much time 
compared to accessing data.)

It seems that we need to use the SQL interface to reach the highest level of 
performance, which somehow breaks the promise of Dataset (preserving type 
safety and having Catalyst and Tungsten performance like datasets).

As for direct access to Row, it seems that it got much slower from 1.6 to 2.0. 
I guess, it's because of the fact that Dataframe is now Dataset[Row], and thus 
uses the same encoding/decoding mechanism as for any other case class.

Best regards,

Julien

> Le 27 août 2016 à 22:32, Maciej Bryński  a écrit :
> 
> 
> 2016-08-27 15:27 GMT+02:00 Julien Dumazert  >:
> df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
> 
> I think reduce and sum has very different performance. 
> Did you try sql.functions.sum ?
> Or of you want to benchmark access to Row object then  count() function will 
> be better idea.
> 
> Regards,
> -- 
> Maciek Bryński



Re: Performance of loading parquet files into case classes in Spark

2016-08-27 Thread Maciej Bryński
2016-08-27 15:27 GMT+02:00 Julien Dumazert :

> df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)


I think reduce and sum has very different performance.
Did you try sql.functions.sum ?
Or of you want to benchmark access to Row object then  count() function
will be better idea.

Regards,
-- 
Maciek Bryński