RE: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Mendelson, Assaf
Hi,
I believe that the UDF is only a small part of the problem. You can easily test 
by doing a UDF for dataframe too.
In my testing I saw that using datasets can be considerably slower than 
dataframe. I can make a guess as to why this happens.
Basically what you are doing in a dataframe is reading the data, then doing a 
function on a single column and counting the results. What this means is that 
in practice, the data is read into the tungsten project unsafe data structure, 
then the single column event_type is analyzed for the filtering and then the 
result is counted.
On the otherhand, dataset would read each element and convert it to a case 
class before doing the calculation. This means two things: First we need to 
read all the columns in the case class and second we need to generate the case 
class itself.
So basically the dataset option reads a lot more (all columns defined in the 
case class) and copies them (in the generation of the case class object used 
for the filtering).
In a more general case (i.e. when more complicated behavior is needed), we 
would be losing even more in terms of performance as catalyst and codegen would 
not take effect. Try for example to do the filter on a numeric value and you 
would see an even bigger difference as predicate pushdown to parquet would 
lower the dataframe’s time and not change much in the dataset.

If your goal is pure performance then probably dataframe solutions would be 
better than dataset in most cases. Dataset provides the advantage of type 
safety and if you have very complex logic where you would need to do multiple 
UDFs with many columns then going directly to dataset would simplify the 
development.



From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
Sent: Monday, October 24, 2016 7:52 PM
To: Antoaneta Marinova
Cc: user
Subject: Re: Spark 2.0 - DataFrames vs Dataset performance

Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames are 
just an alias to Datasets now), but rather using a user defined function for 
filtering vs using Spark builtins. The builtin can use tricks from Project 
Tungsten, such as only deserializing the "event_type" column. The user-defined 
function on the other hand has to be called with a full case class, so the 
whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone with 
a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova 
mailto:antoaneta.vmarin...@gmail.com>> wrote:

Hello,


I am using Spark 2.0 for performing filtering, grouping and counting operations 
on events data saved in parquet files. As the events schema has very nested 
structure I wanted to read them as scala beans to simplify the code but I 
noticed a severe performance degradation. Below you can find simple comparison 
of the same operation using DataFrame and Dataset.


val data = session.read.parquet("events_data/")


// Using Datasets with custom class


//Case class matching the events schema

case class CustomEvent(event_id: Option[String],

event_type: Option[String]
   context : Option[Context],

….
   time: Option[BigInt]) extends Serializable {}

scala> val start = System.currentTimeMillis ;

  val count = data.as<http://data.as>[CustomEvent].filter(event => 
eventNames.contains(event.event_type.get)).count ;

 val time =  System.currentTimeMillis - start


count: Long = 5545

time: Long = 11262


// Using DataFrames


scala>

val start = System.currentTimeMillis ;

val count = data.filter(col("event_type").isin(eventNames:_*)).count ;

val time =  System.currentTimeMillis - start


count: Long = 5545

time: Long = 147


The schema of the events is something like this:


//events schma

schemaroot

|-- event_id: string (nullable = true)

|-- event_type: string (nullable = true)

|-- context: struct (nullable = true)

||-- environment_1: struct (nullable = true)

|||-- filed1: integer (nullable = true)

|||-- filed2: integer (nullable = true)

|||-- filed3: integer (nullable = true)

||-- environment_2: struct (nullable = true)

|||-- filed_1: string (nullable = true)



|||-- filed_n: string (nullable = true)

|-- metadata: struct (nullable = true)

||-- elements: array (nullable = true)

|||-- element: struct (containsNull = true)

||||-- config: string (nullable = true)

||||-- tree: array (nullable = true)

|||||-- element: struct (containsNull = true)

||||||-- path: array (nullable = true)

|||||||-- element: struct (containsNull = true)

||||||||-- key: string (nullable = true)

||||||||-- name: string (nullable = true)

|||   

Re: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Daniel Darabos
Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames
are just an alias to Datasets now), but rather using a user defined
function for filtering vs using Spark builtins. The builtin can use tricks
from Project Tungsten, such as only deserializing the "event_type" column.
The user-defined function on the other hand has to be called with a full
case class, so the whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone
with a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova <
antoaneta.vmarin...@gmail.com> wrote:

> Hello,
>
> I am using Spark 2.0 for performing filtering, grouping and counting
> operations on events data saved in parquet files. As the events schema has
> very nested structure I wanted to read them as scala beans to simplify the
> code but I noticed a severe performance degradation. Below you can find
> simple comparison of the same operation using DataFrame and Dataset.
>
> val data = session.read.parquet("events_data/")
>
> // Using Datasets with custom class
>
> //Case class matching the events schema
>
> case class CustomEvent(event_id: Option[String],
>
> event_type: Option[String]
>context : Option[Context],
>
> ….
>time: Option[BigInt]) extends Serializable {}
>
> scala> val start = System.currentTimeMillis ;
>
>   val count = data.as[CustomEvent].filter(event =>
> eventNames.contains(event.event_type.get)).count ;
>
>  val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 11262
>
> // Using DataFrames
>
> scala>
>
> val start = System.currentTimeMillis ;
>
> val count = data.filter(col("event_type").isin(eventNames:_*)).count ;
>
> val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 147
>
> The schema of the events is something like this:
>
> //events schma
>
> schemaroot
>
> |-- event_id: string (nullable = true)
>
> |-- event_type: string (nullable = true)
>
> |-- context: struct (nullable = true)
>
> ||-- environment_1: struct (nullable = true)
>
> |||-- filed1: integer (nullable = true)
>
> |||-- filed2: integer (nullable = true)
>
> |||-- filed3: integer (nullable = true)
>
> ||-- environment_2: struct (nullable = true)
>
> |||-- filed_1: string (nullable = true)
>
> 
>
> |||-- filed_n: string (nullable = true)
>
> |-- metadata: struct (nullable = true)
>
> ||-- elements: array (nullable = true)
>
> |||-- element: struct (containsNull = true)
>
> ||||-- config: string (nullable = true)
>
> ||||-- tree: array (nullable = true)
>
> |||||-- element: struct (containsNull = true)
>
> ||||||-- path: array (nullable = true)
>
> |||||||-- element: struct (containsNull = true)
>
> ||||||||-- key: string (nullable = true)
>
> ||||||||-- name: string (nullable = true)
>
> ||||||||-- level: integer (nullable = true)
>
> |-- time: long (nullable = true)
>
> Could you please advise me on the usage of the different abstractions and
> help me understand why using datasets with user defined class is so much
> slower.
>
> Thank you,
> Antoaneta
>


Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Antoaneta Marinova
Hello,

I am using Spark 2.0 for performing filtering, grouping and counting
operations on events data saved in parquet files. As the events schema has
very nested structure I wanted to read them as scala beans to simplify the
code but I noticed a severe performance degradation. Below you can find
simple comparison of the same operation using DataFrame and Dataset.

val data = session.read.parquet("events_data/")

// Using Datasets with custom class

//Case class matching the events schema

case class CustomEvent(event_id: Option[String],

event_type: Option[String]
   context : Option[Context],

….
   time: Option[BigInt]) extends Serializable {}

scala> val start = System.currentTimeMillis ;

  val count = data.as[CustomEvent].filter(event =>
eventNames.contains(event.event_type.get)).count ;

 val time =  System.currentTimeMillis - start

count: Long = 5545

time: Long = 11262

// Using DataFrames

scala>

val start = System.currentTimeMillis ;

val count = data.filter(col("event_type").isin(eventNames:_*)).count ;

val time =  System.currentTimeMillis - start

count: Long = 5545

time: Long = 147

The schema of the events is something like this:

//events schma

schemaroot

|-- event_id: string (nullable = true)

|-- event_type: string (nullable = true)

|-- context: struct (nullable = true)

||-- environment_1: struct (nullable = true)

|||-- filed1: integer (nullable = true)

|||-- filed2: integer (nullable = true)

|||-- filed3: integer (nullable = true)

||-- environment_2: struct (nullable = true)

|||-- filed_1: string (nullable = true)



|||-- filed_n: string (nullable = true)

|-- metadata: struct (nullable = true)

||-- elements: array (nullable = true)

|||-- element: struct (containsNull = true)

||||-- config: string (nullable = true)

||||-- tree: array (nullable = true)

|||||-- element: struct (containsNull = true)

||||||-- path: array (nullable = true)

|||||||-- element: struct (containsNull = true)

||||||||-- key: string (nullable = true)

||||||||-- name: string (nullable = true)

||||||||-- level: integer (nullable = true)

|-- time: long (nullable = true)

Could you please advise me on the usage of the different abstractions and
help me understand why using datasets with user defined class is so much
slower.

Thank you,
Antoaneta