Hi all,

Working on a complex Spark 3.0.1 application, I noticed some unexpected
Spark behavior recently that I am hoping someone can explain.  The
application is Java with many large classes, but I have tried to describe
the essential logic below.

During periodic refresh runs, the application extracts data from Cassandra
database tables, filters and combines them into Spark data frames that are
cached and registered as views in the global temporary DB.  The data frames
that were previously there are uncached and replaced by the new ones.
Remote clients can then query the views through Spark SQL.

This refresh process runs in multiple threads that build each new Spark
data frame progressively, reading one Cassandra table, filtering out the
old contents of the view and adding the new Cassandra contents with a union
operation.  Each thread un-caches the old data frame and caches the new
one, then runs a count() action to realize the previous transformations.
Below is some pseudo-code for this multi-threaded logic.


*************************************************************************************************

// Read Cassandra table using Spark Cassandra Connector

Dataset<Row> data =
sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();


// Combine data into single Spark view

Dataset<Row> combinedView = null;

String combinedViewName = "myView";

if (<first time through the data frame>) {

   // Start a new combined view from the contents of the source table

   combinedView = data;

} else {

   // Read the existing combined view to further extend it

   combinedView = sparkSession.table(combinedViewName);

   …

   // Remove stale data with filter

   combinedView = combinedView.filter(<filter condition>);

   …

   // Add new data

   combinedView = combinedView.union(data);

}


// Re-cache modified combined view

sparkSession.catalog().uncacheTable(combinedViewName);

combinedView.createOrReplaceGlobalTempView(combinedViewName);

sparkSession.catalog().cacheTable(combinedViewName);

combinedView.count();


*************************************************************************************************

The application works, but I recently fixed a bug where Spark SQL queries
were running incrementally slower after each refresh, resulting in steady
performance degradation.  After investigation, I found that the <first time
through the data frame> check logic above was not correct, causing a new
combined data frame to build on the lineage of the old data frame RDD that
it is replacing.  The Spark physical plan of many queries was becoming
larger and larger because of 'filter' and 'union' transformations being
added to the same data frame.  I am not yet very familiar with Spark query
plans, but below are fragments of a physical plan before and after a
refresh that highlight the differences.

Before refresh

===========

== Physical Plan ==

AdaptiveSparkPlan isFinalPlan=true

+- *(8) Project…

   …

         +-*(1) Project …

            +- *(1) Filter …

                +- Scan In-memory table ….

                    +- InMemoryRelation …

                        +- Exchange RoundRobinPartitioning(2), …

                           +- Union

                               :- Exchange RoundRobinPartitioning(2), …

                               :   +- Union

                               :       :- *(1) Project …

                               :       :  +- *(1) Filter …

                                       :      +- BatchScan … Cassandra
Scan: <table name>

                                       ...

                               :       :- *(2) Project …

                               :       :  +- *(2) Filter …

                                       :      +- BatchScan … Cassandra
Scan: <table name>

                                       ...

                               :       :- *(3) Project …

                               :       :  +- *(3) Filter …

                                       :      +- BatchScan … Cassandra
Scan: <table name>

                                       ...

                               :       :- *(8) Project …

                               :       :  +- *(4) Filter …

                                       :      +- BatchScan … Cassandra
Scan: <table name>

                                       ...


After refresh

==========


== Physical Plan ==

AdaptiveSparkPlan isFinalPlan=true

+- *(8) Project…

   …

         +-*(1) Project …

            +- *(1) Filter …

                +- Scan In-memory table ….

                    +- InMemoryRelation …

                        +- Exchange RoundRobinPartitioning(2), …

                           +- Union

                               :- Exchange RoundRobinPartitioning(2), …

                               :   +- Union

NEW LINE --->       :- Exchange RoundRobinPartitioning(2), …

NEW LINE --->         :   +- Union

                                  :       :- *(1) Project …

                                  :       :  +- *(1) Filter …

                                          :      +- BatchScan … Cassandra
Scan: <table name>

                                          ...

                                  :       :- *(2) Project …

                                  :       :  +- *(2) Filter …

                                          :      +- BatchScan … Cassandra
Scan: <table name>

                                          ...

                                  :       :- *(3) Project …

                                  :       :  +- *(3) Filter …

                                          :      +- BatchScan … Cassandra
Scan: <table name>

                                          ...

                                  :       :- *(8) Project …

                                  :       :  +- *(4) Filter …

                                          :      +- BatchScan … Cassandra
Scan: <table name>

                                          ...


Once the error was fixed, query times no longer degraded.


My question is this: given that the final contents of the data frame are
being cached correctly in memory (I have verified this), why would the
lineage of the data frame's RDD affect query performance at all?  I would
think the 'Scan in-memory table' step of the above query plan would always
retrieve the data from cache, making the previous 'filter' and 'union'
transformations within the lineage irrelevant to current performance.  Do
Spark SQL queries depend directly on the RDD lineage even when the final
results have been cached?



Thanks in advance for any reply you can give!

Reply via email to