> Do Spark SQL queries depend directly on the RDD lineage even when the
final results have been cached?

Yes, if one of the nodes holding cached data later fails spark would need
to rebuild that state somehow.

You could try checkpointing occasionally and see if that helps

On Sat, 22 May 2021, 11:44 pm Fred Yeadon, <f...@qad.com> wrote:

> Hi all,
>
>
> Working on a complex Spark 3.0.1 application, I noticed some unexpected
> Spark behavior recently that I am hoping someone can explain.  The
> application is Java with many large classes, but I have tried to describe
> the essential logic below.
>
> During periodic refresh runs, the application extracts data from Cassandra
> database tables, filters and combines them into Spark data frames that are
> cached and registered as views in the global temporary DB.  The data frames
> that were previously there are uncached and replaced by the new ones.
> Remote clients can then query the views through Spark SQL.
>
> This refresh process runs in multiple threads that build each new Spark
> data frame progressively, reading one Cassandra table, filtering out the
> old contents of the view and adding the new Cassandra contents with a union
> operation.  Each thread un-caches the old data frame and caches the new
> one, then runs a count() action to realize the previous transformations.
> Below is some pseudo-code for this multi-threaded logic.
>
>
>
> *************************************************************************************************
>
> // Read Cassandra table using Spark Cassandra Connector
>
> Dataset<Row> data =
> sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();
>
>
> // Combine data into single Spark view
>
> Dataset<Row> combinedView = null;
>
> String combinedViewName = "myView";
>
> if (<first time through the data frame>) {
>
>    // Start a new combined view from the contents of the source table
>
>    combinedView = data;
>
> } else {
>
>    // Read the existing combined view to further extend it
>
>    combinedView = sparkSession.table(combinedViewName);
>
>    …
>
>    // Remove stale data with filter
>
>    combinedView = combinedView.filter(<filter condition>);
>
>    …
>
>    // Add new data
>
>    combinedView = combinedView.union(data);
>
> }
>
>
> // Re-cache modified combined view
>
> sparkSession.catalog().uncacheTable(combinedViewName);
>
> combinedView.createOrReplaceGlobalTempView(combinedViewName);
>
> sparkSession.catalog().cacheTable(combinedViewName);
>
> combinedView.count();
>
>
>
> *************************************************************************************************
>
> The application works, but I recently fixed a bug where Spark SQL queries
> were running incrementally slower after each refresh, resulting in steady
> performance degradation.  After investigation, I found that the <first time
> through the data frame> check logic above was not correct, causing a new
> combined data frame to build on the lineage of the old data frame RDD that
> it is replacing.  The Spark physical plan of many queries was becoming
> larger and larger because of 'filter' and 'union' transformations being
> added to the same data frame.  I am not yet very familiar with Spark query
> plans, but below are fragments of a physical plan before and after a
> refresh that highlight the differences.
>
> Before refresh
>
> ===========
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>    …
>
>          +-*(1) Project …
>
>             +- *(1) Filter …
>
>                 +- Scan In-memory table ….
>
>                     +- InMemoryRelation …
>
>                         +- Exchange RoundRobinPartitioning(2), …
>
>                            +- Union
>
>                                :- Exchange RoundRobinPartitioning(2), …
>
>                                :   +- Union
>
>                                :       :- *(1) Project …
>
>                                :       :  +- *(1) Filter …
>
>                                        :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                        ...
>
>                                :       :- *(2) Project …
>
>                                :       :  +- *(2) Filter …
>
>                                        :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                        ...
>
>                                :       :- *(3) Project …
>
>                                :       :  +- *(3) Filter …
>
>                                        :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                        ...
>
>                                :       :- *(8) Project …
>
>                                :       :  +- *(4) Filter …
>
>                                        :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                        ...
>
>
> After refresh
>
> ==========
>
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>    …
>
>          +-*(1) Project …
>
>             +- *(1) Filter …
>
>                 +- Scan In-memory table ….
>
>                     +- InMemoryRelation …
>
>                         +- Exchange RoundRobinPartitioning(2), …
>
>                            +- Union
>
>                                :- Exchange RoundRobinPartitioning(2), …
>
>                                :   +- Union
>
> NEW LINE --->       :- Exchange RoundRobinPartitioning(2), …
>
> NEW LINE --->         :   +- Union
>
>                                   :       :- *(1) Project …
>
>                                   :       :  +- *(1) Filter …
>
>                                           :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                           ...
>
>                                   :       :- *(2) Project …
>
>                                   :       :  +- *(2) Filter …
>
>                                           :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                           ...
>
>                                   :       :- *(3) Project …
>
>                                   :       :  +- *(3) Filter …
>
>                                           :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                           ...
>
>                                   :       :- *(8) Project …
>
>                                   :       :  +- *(4) Filter …
>
>                                           :      +- BatchScan … Cassandra
> Scan: <table name>
>
>                                           ...
>
>
> Once the error was fixed, query times no longer degraded.
>
>
> My question is this: given that the final contents of the data frame are
> being cached correctly in memory (I have verified this), why would the
> lineage of the data frame's RDD affect query performance at all?  I would
> think the 'Scan in-memory table' step of the above query plan would always
> retrieve the data from cache, making the previous 'filter' and 'union'
> transformations within the lineage irrelevant to current performance.  Do
> Spark SQL queries depend directly on the RDD lineage even when the final
> results have been cached?
>
>
>
> Thanks in advance for any reply you can give!
>
>

Reply via email to