RE: Spark 2.0 - DataFrames vs Dataset performance
Hi, I believe that the UDF is only a small part of the problem. You can easily test by doing a UDF for dataframe too. In my testing I saw that using datasets can be considerably slower than dataframe. I can make a guess as to why this happens. Basically what you are doing in a dataframe is reading the data, then doing a function on a single column and counting the results. What this means is that in practice, the data is read into the tungsten project unsafe data structure, then the single column event_type is analyzed for the filtering and then the result is counted. On the otherhand, dataset would read each element and convert it to a case class before doing the calculation. This means two things: First we need to read all the columns in the case class and second we need to generate the case class itself. So basically the dataset option reads a lot more (all columns defined in the case class) and copies them (in the generation of the case class object used for the filtering). In a more general case (i.e. when more complicated behavior is needed), we would be losing even more in terms of performance as catalyst and codegen would not take effect. Try for example to do the filter on a numeric value and you would see an even bigger difference as predicate pushdown to parquet would lower the dataframe’s time and not change much in the dataset. If your goal is pure performance then probably dataframe solutions would be better than dataset in most cases. Dataset provides the advantage of type safety and if you have very complex logic where you would need to do multiple UDFs with many columns then going directly to dataset would simplify the development. From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] Sent: Monday, October 24, 2016 7:52 PM To: Antoaneta Marinova Cc: user Subject: Re: Spark 2.0 - DataFrames vs Dataset performance Hi Antoaneta, I believe the difference is not due to Datasets being slower (DataFrames are just an alias to Datasets now), but rather using a user defined function for filtering vs using Spark builtins. The builtin can use tricks from Project Tungsten, such as only deserializing the "event_type" column. The user-defined function on the other hand has to be called with a full case class, so the whole object needs to be deserialized for each row. Well, that is my understanding from reading some slides. Hopefully someone with a more direct knowledge of the code will correct me if I'm wrong. On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova mailto:antoaneta.vmarin...@gmail.com>> wrote: Hello, I am using Spark 2.0 for performing filtering, grouping and counting operations on events data saved in parquet files. As the events schema has very nested structure I wanted to read them as scala beans to simplify the code but I noticed a severe performance degradation. Below you can find simple comparison of the same operation using DataFrame and Dataset. val data = session.read.parquet("events_data/") // Using Datasets with custom class //Case class matching the events schema case class CustomEvent(event_id: Option[String], event_type: Option[String] context : Option[Context], …. time: Option[BigInt]) extends Serializable {} scala> val start = System.currentTimeMillis ; val count = data.as<http://data.as>[CustomEvent].filter(event => eventNames.contains(event.event_type.get)).count ; val time = System.currentTimeMillis - start count: Long = 5545 time: Long = 11262 // Using DataFrames scala> val start = System.currentTimeMillis ; val count = data.filter(col("event_type").isin(eventNames:_*)).count ; val time = System.currentTimeMillis - start count: Long = 5545 time: Long = 147 The schema of the events is something like this: //events schma schemaroot |-- event_id: string (nullable = true) |-- event_type: string (nullable = true) |-- context: struct (nullable = true) ||-- environment_1: struct (nullable = true) |||-- filed1: integer (nullable = true) |||-- filed2: integer (nullable = true) |||-- filed3: integer (nullable = true) ||-- environment_2: struct (nullable = true) |||-- filed_1: string (nullable = true) |||-- filed_n: string (nullable = true) |-- metadata: struct (nullable = true) ||-- elements: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- config: string (nullable = true) ||||-- tree: array (nullable = true) |||||-- element: struct (containsNull = true) ||||||-- path: array (nullable = true) |||||||-- element: struct (containsNull = true) ||||||||-- key: string (nullable = true) ||||||||-- name: string (nullable = true) |||
Re: Spark 2.0 - DataFrames vs Dataset performance
Hi Antoaneta, I believe the difference is not due to Datasets being slower (DataFrames are just an alias to Datasets now), but rather using a user defined function for filtering vs using Spark builtins. The builtin can use tricks from Project Tungsten, such as only deserializing the "event_type" column. The user-defined function on the other hand has to be called with a full case class, so the whole object needs to be deserialized for each row. Well, that is my understanding from reading some slides. Hopefully someone with a more direct knowledge of the code will correct me if I'm wrong. On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova < antoaneta.vmarin...@gmail.com> wrote: > Hello, > > I am using Spark 2.0 for performing filtering, grouping and counting > operations on events data saved in parquet files. As the events schema has > very nested structure I wanted to read them as scala beans to simplify the > code but I noticed a severe performance degradation. Below you can find > simple comparison of the same operation using DataFrame and Dataset. > > val data = session.read.parquet("events_data/") > > // Using Datasets with custom class > > //Case class matching the events schema > > case class CustomEvent(event_id: Option[String], > > event_type: Option[String] >context : Option[Context], > > …. >time: Option[BigInt]) extends Serializable {} > > scala> val start = System.currentTimeMillis ; > > val count = data.as[CustomEvent].filter(event => > eventNames.contains(event.event_type.get)).count ; > > val time = System.currentTimeMillis - start > > count: Long = 5545 > > time: Long = 11262 > > // Using DataFrames > > scala> > > val start = System.currentTimeMillis ; > > val count = data.filter(col("event_type").isin(eventNames:_*)).count ; > > val time = System.currentTimeMillis - start > > count: Long = 5545 > > time: Long = 147 > > The schema of the events is something like this: > > //events schma > > schemaroot > > |-- event_id: string (nullable = true) > > |-- event_type: string (nullable = true) > > |-- context: struct (nullable = true) > > ||-- environment_1: struct (nullable = true) > > |||-- filed1: integer (nullable = true) > > |||-- filed2: integer (nullable = true) > > |||-- filed3: integer (nullable = true) > > ||-- environment_2: struct (nullable = true) > > |||-- filed_1: string (nullable = true) > > > > |||-- filed_n: string (nullable = true) > > |-- metadata: struct (nullable = true) > > ||-- elements: array (nullable = true) > > |||-- element: struct (containsNull = true) > > ||||-- config: string (nullable = true) > > ||||-- tree: array (nullable = true) > > |||||-- element: struct (containsNull = true) > > ||||||-- path: array (nullable = true) > > |||||||-- element: struct (containsNull = true) > > ||||||||-- key: string (nullable = true) > > ||||||||-- name: string (nullable = true) > > ||||||||-- level: integer (nullable = true) > > |-- time: long (nullable = true) > > Could you please advise me on the usage of the different abstractions and > help me understand why using datasets with user defined class is so much > slower. > > Thank you, > Antoaneta >
Spark 2.0 - DataFrames vs Dataset performance
Hello, I am using Spark 2.0 for performing filtering, grouping and counting operations on events data saved in parquet files. As the events schema has very nested structure I wanted to read them as scala beans to simplify the code but I noticed a severe performance degradation. Below you can find simple comparison of the same operation using DataFrame and Dataset. val data = session.read.parquet("events_data/") // Using Datasets with custom class //Case class matching the events schema case class CustomEvent(event_id: Option[String], event_type: Option[String] context : Option[Context], …. time: Option[BigInt]) extends Serializable {} scala> val start = System.currentTimeMillis ; val count = data.as[CustomEvent].filter(event => eventNames.contains(event.event_type.get)).count ; val time = System.currentTimeMillis - start count: Long = 5545 time: Long = 11262 // Using DataFrames scala> val start = System.currentTimeMillis ; val count = data.filter(col("event_type").isin(eventNames:_*)).count ; val time = System.currentTimeMillis - start count: Long = 5545 time: Long = 147 The schema of the events is something like this: //events schma schemaroot |-- event_id: string (nullable = true) |-- event_type: string (nullable = true) |-- context: struct (nullable = true) ||-- environment_1: struct (nullable = true) |||-- filed1: integer (nullable = true) |||-- filed2: integer (nullable = true) |||-- filed3: integer (nullable = true) ||-- environment_2: struct (nullable = true) |||-- filed_1: string (nullable = true) |||-- filed_n: string (nullable = true) |-- metadata: struct (nullable = true) ||-- elements: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- config: string (nullable = true) ||||-- tree: array (nullable = true) |||||-- element: struct (containsNull = true) ||||||-- path: array (nullable = true) |||||||-- element: struct (containsNull = true) ||||||||-- key: string (nullable = true) ||||||||-- name: string (nullable = true) ||||||||-- level: integer (nullable = true) |-- time: long (nullable = true) Could you please advise me on the usage of the different abstractions and help me understand why using datasets with user defined class is so much slower. Thank you, Antoaneta