Re: 3Ps for Datasets not available?! (=Parquet Predicate Pushdown)

2016-08-30 Thread Jacek Laskowski
Hi Reynold,

That's what I was told few times already (most notably by Adam on
twitter), but couldn't understand what it meant exactly. It's only now
when I understand what you're saying, Reynold :)

Does this put DataFrame's Column-based or SQL-based queries usually
faster than Datasets with Encoders?

How much I'm wrong to claim that for parquet files, Hive tables, and
JDBC tables using DataFrame + Columns/SQL-based queries is usually
faster than Datasets? Is that Datasets only shine for strongly typed
queries with data sources with no support for such optimizations like
filter pushdown? I'm tempted to say that for some data sources
DataFrames are faster than Datasets...always. True? What am I missing?

https://twitter.com/jaceklaskowski/status/770554918419755008

Thanks a lot, Reynold, for helping me out to get the gist of it all!

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Aug 30, 2016 at 10:23 AM, Reynold Xin  wrote:
> The UDF is a black box so Spark can't know what it is dealing with. There
> are simple cases in which we can analyze the UDFs byte code and infer what
> it is doing, but it is pretty difficult to do in general.
>
>
> On Tuesday, August 30, 2016, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> I've been playing with UDFs and why they're a blackbox for Spark's
>> optimizer and started with filters to showcase the optimizations in
>> play.
>>
>> My current understanding is that the predicate pushdowns are supported
>> by the following data sources:
>>
>> 1. Hive tables
>> 2. Parquet files
>> 3. ORC files
>> 4. JDBC
>>
>> While working on examples I came to a conclusion that not only does
>> predicate pushdown work for the data sources mentioned above but
>> solely for DataFrames. That was quite interesting since I was so much
>> into Datasets as strongly type-safe data abstractions in Spark SQL.
>>
>> Can you help me to find the truth? Any links to videos, articles,
>> commits and such to further deepen my understanding of optimizations
>> in Spark SQL 2.0? I'd greatly appreciate.
>>
>> The following query pushes the filter down to Parquet (see
>> PushedFilters attribute at the bottom)
>>
>> scala> cities.filter('name === "Warsaw").queryExecution.executedPlan
>> res30: org.apache.spark.sql.execution.SparkPlan =
>> *Project [id#196L, name#197]
>> +- *Filter (isnotnull(name#197) && (name#197 = Warsaw))
>>+- *FileScan parquet [id#196L,name#197] Batched: true, Format:
>> ParquetFormat, InputPaths:
>> file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [],
>> PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema:
>> struct
>>
>> Why does this not work for Datasets? Is the function/lambda too
>> complex? Are there any examples where it works for Datasets? Are we
>> perhaps trading strong type-safety over optimizations like predicate
>> pushdown (and the feature's are yet to come in the next releases of
>> Spark 2)?
>>
>> scala> cities.as[(Long, String)].filter(_._2 ==
>> "Warsaw").queryExecution.executedPlan
>> res31: org.apache.spark.sql.execution.SparkPlan =
>> *Filter .apply
>> +- *FileScan parquet [id#196L,name#197] Batched: true, Format:
>> ParquetFormat, InputPaths:
>> file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [],
>> PushedFilters: [], ReadSchema: struct
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: 3Ps for Datasets not available?! (=Parquet Predicate Pushdown)

2016-08-30 Thread Reynold Xin
The UDF is a black box so Spark can't know what it is dealing with. There
are simple cases in which we can analyze the UDFs byte code and infer what
it is doing, but it is pretty difficult to do in general.

On Tuesday, August 30, 2016, Jacek Laskowski  wrote:

> Hi,
>
> I've been playing with UDFs and why they're a blackbox for Spark's
> optimizer and started with filters to showcase the optimizations in
> play.
>
> My current understanding is that the predicate pushdowns are supported
> by the following data sources:
>
> 1. Hive tables
> 2. Parquet files
> 3. ORC files
> 4. JDBC
>
> While working on examples I came to a conclusion that not only does
> predicate pushdown work for the data sources mentioned above but
> solely for DataFrames. That was quite interesting since I was so much
> into Datasets as strongly type-safe data abstractions in Spark SQL.
>
> Can you help me to find the truth? Any links to videos, articles,
> commits and such to further deepen my understanding of optimizations
> in Spark SQL 2.0? I'd greatly appreciate.
>
> The following query pushes the filter down to Parquet (see
> PushedFilters attribute at the bottom)
>
> scala> cities.filter('name === "Warsaw").queryExecution.executedPlan
> res30: org.apache.spark.sql.execution.SparkPlan =
> *Project [id#196L, name#197]
> +- *Filter (isnotnull(name#197) && (name#197 = Warsaw))
>+- *FileScan parquet [id#196L,name#197] Batched: true, Format:
> ParquetFormat, InputPaths:
> file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [],
> PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema:
> struct
>
> Why does this not work for Datasets? Is the function/lambda too
> complex? Are there any examples where it works for Datasets? Are we
> perhaps trading strong type-safety over optimizations like predicate
> pushdown (and the feature's are yet to come in the next releases of
> Spark 2)?
>
> scala> cities.as[(Long, String)].filter(_._2 ==
> "Warsaw").queryExecution.executedPlan
> res31: org.apache.spark.sql.execution.SparkPlan =
> *Filter .apply
> +- *FileScan parquet [id#196L,name#197] Batched: true, Format:
> ParquetFormat, InputPaths:
> file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [],
> PushedFilters: [], ReadSchema: struct
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>
>


3Ps for Datasets not available?! (=Parquet Predicate Pushdown)

2016-08-30 Thread Jacek Laskowski
Hi,

I've been playing with UDFs and why they're a blackbox for Spark's
optimizer and started with filters to showcase the optimizations in
play.

My current understanding is that the predicate pushdowns are supported
by the following data sources:

1. Hive tables
2. Parquet files
3. ORC files
4. JDBC

While working on examples I came to a conclusion that not only does
predicate pushdown work for the data sources mentioned above but
solely for DataFrames. That was quite interesting since I was so much
into Datasets as strongly type-safe data abstractions in Spark SQL.

Can you help me to find the truth? Any links to videos, articles,
commits and such to further deepen my understanding of optimizations
in Spark SQL 2.0? I'd greatly appreciate.

The following query pushes the filter down to Parquet (see
PushedFilters attribute at the bottom)

scala> cities.filter('name === "Warsaw").queryExecution.executedPlan
res30: org.apache.spark.sql.execution.SparkPlan =
*Project [id#196L, name#197]
+- *Filter (isnotnull(name#197) && (name#197 = Warsaw))
   +- *FileScan parquet [id#196L,name#197] Batched: true, Format:
ParquetFormat, InputPaths:
file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [],
PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema:
struct

Why does this not work for Datasets? Is the function/lambda too
complex? Are there any examples where it works for Datasets? Are we
perhaps trading strong type-safety over optimizations like predicate
pushdown (and the feature's are yet to come in the next releases of
Spark 2)?

scala> cities.as[(Long, String)].filter(_._2 ==
"Warsaw").queryExecution.executedPlan
res31: org.apache.spark.sql.execution.SparkPlan =
*Filter .apply
+- *FileScan parquet [id#196L,name#197] Batched: true, Format:
ParquetFormat, InputPaths:
file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org