Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread fwy
Thanks to all for the quick replies, they helped a lot.  To answer a few of
the follow-up questions ...

> 1. How did you fix this performance which I gather programmatically

The main problem in my original code was that the  logic was not being executed when it should have been.  This
'if' clause has the effect of creating a new data frame directly from the
contents of the Cassandra data source, throwing away the old RDD lineage of
the existing data frame.  By incorrectly executing the 'else' clause
instead, the new data frame was being created from the existing data frame
again and again, extending the entire lineage with each iteration and
degrading query times.  Fixing the code stopped the lineage from constantly
growing with each refresh.

> 2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled",
> "true")

Yes, this is already set.

> 3. Depending on the size of your data both source and new data, do you
> have any indication that your data in global temporary view is totally
> cached. This should show in the storage tab in UI. If you have data on the
> disk for then this will affect the performance 

Yes, the cached temporary view is in the storage tab with the expected
amount of memory usage, although it is difficult to check the specific
contents of this memory cache.


> You could try checkpointing occasionally and see if that helps

This was very helpful, as I was not familiar with the Spark checkpointing
feature.  In our application, we don't need fault-tolerant dataframes/RDDs,
so I inserted localCheckpoint() calls in the code prior to the uncaching and
re-caching steps.  This seems to have made all queries run faster, with some
improved immediately by a factor of 3!


Based on these tests, it is clear that RDD lineage can have a major effect
on the performance of Spark applications, whether or not the data has been
cached in memory.

Thanks again for the good advice!





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread Sebastian Piu
> Do Spark SQL queries depend directly on the RDD lineage even when the
final results have been cached?

Yes, if one of the nodes holding cached data later fails spark would need
to rebuild that state somehow.

You could try checkpointing occasionally and see if that helps

On Sat, 22 May 2021, 11:44 pm Fred Yeadon,  wrote:

> Hi all,
>
>
> Working on a complex Spark 3.0.1 application, I noticed some unexpected
> Spark behavior recently that I am hoping someone can explain.  The
> application is Java with many large classes, but I have tried to describe
> the essential logic below.
>
> During periodic refresh runs, the application extracts data from Cassandra
> database tables, filters and combines them into Spark data frames that are
> cached and registered as views in the global temporary DB.  The data frames
> that were previously there are uncached and replaced by the new ones.
> Remote clients can then query the views through Spark SQL.
>
> This refresh process runs in multiple threads that build each new Spark
> data frame progressively, reading one Cassandra table, filtering out the
> old contents of the view and adding the new Cassandra contents with a union
> operation.  Each thread un-caches the old data frame and caches the new
> one, then runs a count() action to realize the previous transformations.
> Below is some pseudo-code for this multi-threaded logic.
>
>
>
> *
>
> // Read Cassandra table using Spark Cassandra Connector
>
> Dataset data =
> sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();
>
>
> // Combine data into single Spark view
>
> Dataset combinedView = null;
>
> String combinedViewName = "myView";
>
> if () {
>
>// Start a new combined view from the contents of the source table
>
>combinedView = data;
>
> } else {
>
>// Read the existing combined view to further extend it
>
>combinedView = sparkSession.table(combinedViewName);
>
>…
>
>// Remove stale data with filter
>
>combinedView = combinedView.filter();
>
>…
>
>// Add new data
>
>combinedView = combinedView.union(data);
>
> }
>
>
> // Re-cache modified combined view
>
> sparkSession.catalog().uncacheTable(combinedViewName);
>
> combinedView.createOrReplaceGlobalTempView(combinedViewName);
>
> sparkSession.catalog().cacheTable(combinedViewName);
>
> combinedView.count();
>
>
>
> *
>
> The application works, but I recently fixed a bug where Spark SQL queries
> were running incrementally slower after each refresh, resulting in steady
> performance degradation.  After investigation, I found that the  through the data frame> check logic above was not correct, causing a new
> combined data frame to build on the lineage of the old data frame RDD that
> it is replacing.  The Spark physical plan of many queries was becoming
> larger and larger because of 'filter' and 'union' transformations being
> added to the same data frame.  I am not yet very familiar with Spark query
> plans, but below are fragments of a physical plan before and after a
> refresh that highlight the differences.
>
> Before refresh
>
> ===
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>…
>
>  +-*(1) Project …
>
> +- *(1) Filter …
>
> +- Scan In-memory table ….
>
> +- InMemoryRelation …
>
> +- Exchange RoundRobinPartitioning(2), …
>
>+- Union
>
>:- Exchange RoundRobinPartitioning(2), …
>
>:   +- Union
>
>:   :- *(1) Project …
>
>:   :  +- *(1) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>:   :- *(2) Project …
>
>:   :  +- *(2) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>:   :- *(3) Project …
>
>:   :  +- *(3) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>:   :- *(8) Project …
>
>:   :  +- *(4) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>
> After refresh
>
> ==
>
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>…
>
>  +-*(1) Project …
>
> +- *(1) 

Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread Mich Talebzadeh
Hi Fred,

You said you managed to fix the problem somehow and have attributed some
issues with RDD lineage. Few things come to my mind:


   1. How did you fix this performance which I gather programmatically
   2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled",
   "true")
   3. Depending on the size of your data both source and new data, do you
   have any indication that your data in global temporary view is totally
   cached. This should show in the storage tab in UI. If you have data on the
   disk for then this will affect the performance
   4. What is the output of print(rdd.toDebugString())


[image: image.png]


I doubt this issue is caused by RDD lineage by adding additional steps not
required.

HTH

Mich


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 22 May 2021 at 23:44, Fred Yeadon  wrote:

> Hi all,
>
>
> Working on a complex Spark 3.0.1 application, I noticed some unexpected
> Spark behavior recently that I am hoping someone can explain.  The
> application is Java with many large classes, but I have tried to describe
> the essential logic below.
>
> During periodic refresh runs, the application extracts data from Cassandra
> database tables, filters and combines them into Spark data frames that are
> cached and registered as views in the global temporary DB.  The data frames
> that were previously there are uncached and replaced by the new ones.
> Remote clients can then query the views through Spark SQL.
>
> This refresh process runs in multiple threads that build each new Spark
> data frame progressively, reading one Cassandra table, filtering out the
> old contents of the view and adding the new Cassandra contents with a union
> operation.  Each thread un-caches the old data frame and caches the new
> one, then runs a count() action to realize the previous transformations.
> Below is some pseudo-code for this multi-threaded logic.
>
>
>
> *
>
> // Read Cassandra table using Spark Cassandra Connector
>
> Dataset data =
> sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load();
>
>
> // Combine data into single Spark view
>
> Dataset combinedView = null;
>
> String combinedViewName = "myView";
>
> if () {
>
>// Start a new combined view from the contents of the source table
>
>combinedView = data;
>
> } else {
>
>// Read the existing combined view to further extend it
>
>combinedView = sparkSession.table(combinedViewName);
>
>…
>
>// Remove stale data with filter
>
>combinedView = combinedView.filter();
>
>…
>
>// Add new data
>
>combinedView = combinedView.union(data);
>
> }
>
>
> // Re-cache modified combined view
>
> sparkSession.catalog().uncacheTable(combinedViewName);
>
> combinedView.createOrReplaceGlobalTempView(combinedViewName);
>
> sparkSession.catalog().cacheTable(combinedViewName);
>
> combinedView.count();
>
>
>
> *
>
> The application works, but I recently fixed a bug where Spark SQL queries
> were running incrementally slower after each refresh, resulting in steady
> performance degradation.  After investigation, I found that the  through the data frame> check logic above was not correct, causing a new
> combined data frame to build on the lineage of the old data frame RDD that
> it is replacing.  The Spark physical plan of many queries was becoming
> larger and larger because of 'filter' and 'union' transformations being
> added to the same data frame.  I am not yet very familiar with Spark query
> plans, but below are fragments of a physical plan before and after a
> refresh that highlight the differences.
>
> Before refresh
>
> ===
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=true
>
> +- *(8) Project…
>
>…
>
>  +-*(1) Project …
>
> +- *(1) Filter …
>
> +- Scan In-memory table ….
>
> +- InMemoryRelation …
>
> +- Exchange RoundRobinPartitioning(2), …
>
>+- Union
>
>:- Exchange RoundRobinPartitioning(2), …
>
>:   +- Union
>
>:   :- *(1) Project …
>
>:   :  +- *(1) Filter …
>
>:  +- BatchScan … Cassandra
> Scan: 
>
>...
>
>:   :- *(2) 

Re: Spark query

2015-07-08 Thread Harish Butani
try the spark-datetime package:
https://github.com/SparklineData/spark-datetime
Follow this example
https://github.com/SparklineData/spark-datetime#a-basic-example to get the
different attributes of a DateTime.

On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote:

 As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs,
 please take a look below builtin UDFs of Hive, get day of year should be as
 simply as existing RDBMS

 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions


 At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote:

 Hi everyone,

 I can't get 'day of year'  when using spark query. Can you help any way to
 achieve day of year?

 Regards,
 Ravi






Re: Spark query

2015-07-08 Thread Brandon White
Convert the column to a column of java Timestamps. Then you can do the
following

import java.sql.Timestamp
import java.util.Calendar
def date_trunc(timestamp:Timestamp, timeField:String) = {
  timeField match {
case hour =
  val cal = Calendar.getInstance()
  cal.setTimeInMillis(timestamp.getTime())
  cal.get(Calendar.HOUR_OF_DAY)

case day =
  val cal = Calendar.getInstance()
  cal.setTimeInMillis(timestamp.getTime())
  cal.get(Calendar.DAY)
  }
}

sqlContext.udf.register(date_trunc, date_trunc _)

On Wed, Jul 8, 2015 at 9:23 PM, Harish Butani rhbutani.sp...@gmail.com
wrote:

 try the spark-datetime package:
 https://github.com/SparklineData/spark-datetime
 Follow this example
 https://github.com/SparklineData/spark-datetime#a-basic-example to get
 the different attributes of a DateTime.

 On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote:

 As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs,
 please take a look below builtin UDFs of Hive, get day of year should be as
 simply as existing RDBMS

 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions


 At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote:

 Hi everyone,

 I can't get 'day of year'  when using spark query. Can you help any way
 to achieve day of year?

 Regards,
 Ravi