[ 
https://issues.apache.org/jira/browse/SPARK-28188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-28188:
----------------------------------
    Affects Version/s:     (was: 2.4.3)
                       3.0.0

> Materialize Dataframe API 
> --------------------------
>
>                 Key: SPARK-28188
>                 URL: https://issues.apache.org/jira/browse/SPARK-28188
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Vinitha Reddy Gankidi
>            Priority: Major
>
> We have added a new API to materialize dataframes and our internal users have 
> found it very useful. For use cases where you need to do different 
> computations on the same dataframe, Spark recomputes the dataframe each time. 
> This is problematic if evaluation of the dataframe is expensive.
> Materialize is a Spark action. It is a way to let Spark explicitly know that 
> the dataframe has already been computed. Once a dataframe is materialized, 
> Spark skips all stages prior to the materialize when the dataframe is reused 
> later on.
> Spark may scan the same table twice if two queries load different columns. 
> For example, the following two queries would scan the same data twice:
> {code:java}
> val tab = spark.table("some_table").filter("c LIKE '%match%'")
> val num_groups = tab.agg(distinctCount($"a"))
> val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code}
>  
> The same table is scanned twice because Spark doesn't know it should load b 
> when the first query runs. You can use materialize to load and then reuse the 
> data:
> {code:java}
> val materialized = spark.table("some_table").filter("c LIKE '%match%'")
>                         .select($"a", $"b").repartition($"a").materialize()
> val num_groups = materialized.agg(distinctCount($"a"))
> val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code}
>  
> This uses select to filter out columns that don't need to be loaded. Without 
> this, Spark doesn't know that only a and b are going to be used later.
> This example also uses repartition to add a shuffle because Spark resumes 
> from the last shuffle. In most cases you may need to repartition the 
> dataframe before materializing it in order to skip the expensive stages as 
> repartition introduces a new stage. 
> h3. Materialize vs Cache:
>  * Caching/Persisting of dataframes is lazy. The first time the dataset is 
> computed in an action, it will be kept in memory on the nodes. Materialize is 
> an action that runs a job that produces the rows of data that a data frame 
> represents, and returns a new data frame with the result. When the result 
> data frame is used, Spark resumes execution using the data from the last 
> shuffle.
>  * By reusing shuffle data, materialized data is served by the cluster's 
> persistent shuffle servers instead of Spark executors. This makes materialize 
> more reliable. Caching on the other hand happens in the executor where the 
> task runs and data could be lost if executors time out from inactivity or run 
> out of memory.
>  * Since materialize is more reliable and uses fewer resources than cache, it 
> is usually a better choice for batch workloads. But, for processing that 
> iterates over a dataset many times, it is better to keep the data in memory 
> using cache or persist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to