Vinitha Reddy Gankidi created SPARK-28188:
---------------------------------------------

             Summary: Materialize Dataframe API 
                 Key: SPARK-28188
                 URL: https://issues.apache.org/jira/browse/SPARK-28188
             Project: Spark
          Issue Type: New Feature
          Components: Spark Core
    Affects Versions: 2.4.3
            Reporter: Vinitha Reddy Gankidi


We have added a new API to materialize dataframes and our internal users have 
found it very useful. For use cases where you need to do different computations 
on the same dataframe, Spark recomputes the dataframe each time. This is 
problematic if evaluation of the dataframe is expensive.

Materialize is a Spark action. It is a way to let Spark explicitly know that 
the dataframe has already been computed. Once a dataframe is materialized, 
Spark skips all stages prior to the materialize when the dataframe is reused 
later on.

Spark may scan the same table twice if two queries load different columns. For 
example, the following two queries would scan the same data twice:
{code:java}
val tab = spark.table("some_table").filter("c LIKE '%match%'")

val num_groups = tab.agg(distinctCount($"a"))

val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code}
 

The same table is scanned twice because Spark doesn't know it should load b 
when the first query runs. You can use materialize to load and then reuse the 
data:

{code:java}
val materialized = spark.table("some_table").filter("c LIKE '%match%'")

                        .select($"a", $"b").repartition($"a").materialize()

val num_groups = materialized.agg(distinctCount($"a"))

val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code}
 

This uses select to filter out columns that don't need to be loaded. Without 
this, Spark doesn't know that only a and b are going to be used later.

This example also uses repartition to add a shuffle because Spark resumes from 
the last shuffle. In most cases you may need to repartition the dataframe 
before materializing it in order to skip the expensive stages as repartition 
introduces a new stage. 
h3. Materialize vs Cache:
 * Caching/Persisting of dataframes is lazy. The first time the dataset is 
computed in an action, it will be kept in memory on the nodes. Materialize is 
an action that runs a job that produces the rows of data that a data frame 
represents, and returns a new data frame with the result. When the result data 
frame is used, Spark resumes execution using the data from the last shuffle.
 * By reusing shuffle data, materialized data is served by the cluster's 
persistent shuffle servers instead of Spark executors. This makes materialize 
more reliable. Caching on the other hand happens in the executor where the task 
runs and data could be lost if executors time out from inactivity or run out of 
memory.
 * Since materialize is more reliable and uses fewer resources than cache, it 
is usually a better choice for batch workloads. But, for processing that 
iterates over a dataset many times, it is better to keep the data in memory 
using cache or persist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to