[ https://issues.apache.org/jira/browse/SPARK-28188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-28188: ---------------------------------- Affects Version/s: (was: 2.4.3) 3.0.0 > Materialize Dataframe API > -------------------------- > > Key: SPARK-28188 > URL: https://issues.apache.org/jira/browse/SPARK-28188 > Project: Spark > Issue Type: New Feature > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Vinitha Reddy Gankidi > Priority: Major > > We have added a new API to materialize dataframes and our internal users have > found it very useful. For use cases where you need to do different > computations on the same dataframe, Spark recomputes the dataframe each time. > This is problematic if evaluation of the dataframe is expensive. > Materialize is a Spark action. It is a way to let Spark explicitly know that > the dataframe has already been computed. Once a dataframe is materialized, > Spark skips all stages prior to the materialize when the dataframe is reused > later on. > Spark may scan the same table twice if two queries load different columns. > For example, the following two queries would scan the same data twice: > {code:java} > val tab = spark.table("some_table").filter("c LIKE '%match%'") > val num_groups = tab.agg(distinctCount($"a")) > val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code} > > The same table is scanned twice because Spark doesn't know it should load b > when the first query runs. You can use materialize to load and then reuse the > data: > {code:java} > val materialized = spark.table("some_table").filter("c LIKE '%match%'") > .select($"a", $"b").repartition($"a").materialize() > val num_groups = materialized.agg(distinctCount($"a")) > val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code} > > This uses select to filter out columns that don't need to be loaded. Without > this, Spark doesn't know that only a and b are going to be used later. > This example also uses repartition to add a shuffle because Spark resumes > from the last shuffle. In most cases you may need to repartition the > dataframe before materializing it in order to skip the expensive stages as > repartition introduces a new stage. > h3. Materialize vs Cache: > * Caching/Persisting of dataframes is lazy. The first time the dataset is > computed in an action, it will be kept in memory on the nodes. Materialize is > an action that runs a job that produces the rows of data that a data frame > represents, and returns a new data frame with the result. When the result > data frame is used, Spark resumes execution using the data from the last > shuffle. > * By reusing shuffle data, materialized data is served by the cluster's > persistent shuffle servers instead of Spark executors. This makes materialize > more reliable. Caching on the other hand happens in the executor where the > task runs and data could be lost if executors time out from inactivity or run > out of memory. > * Since materialize is more reliable and uses fewer resources than cache, it > is usually a better choice for batch workloads. But, for processing that > iterates over a dataset many times, it is better to keep the data in memory > using cache or persist. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org