Im working on structered streaming application wherein im reading from Kafka as stream and for each batch of streams i need to perform S3 lookup file (which is nearly 200gb) to fetch some attributes .So im using df.persist() (basically caching the lookup) but i need to refresh the dataframe as the S3 lookup data changes frequently.im using below code
class RefreshcachedDF(sparkSession: SparkSession) extends StreamingQueryListener { override def onQueryStarted(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryTerminated(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val currTime = System.currentTimeMillis() if (currTime > (latestrefreshtime mentioned in a globaltempview)) { //oldDF is a cached Dataframe created from GlobalTempView which is of size 150GB. oldDF.unpersist() //I guess this is async call ,should i use unpersist(true) which is blocking?and is it safe ? val inputDf: DataFrame = readFile(spec, sparkSession) val recreateddf = inputDf.persist() val count = recreateddf.count() } } } } Is the above approach is a better solution to refresh cached dataframe? and the trigger for this refresh is will store the expirydate of cache for S3 in a globaltempview . Note:S3 is one lookup source but i do have other sources which has data size of 20 to 30 GB - So the question is this the right place to refresh the cached df ? - if yes should i use blocking or non-blocking unpersist method as the data is huge 15GB? - For similar issue i see below response from Tdas with subject as Re: Refreshing a persisted RDD "Yes, you will have to recreate the streaming Dataframe along with the static Dataframe, and restart the query. There isnt a currently feasible to do this without a query restart. But restarting a query WITHOUT restarting the whole application + spark cluster, is reasonably fast. If your applicatoin can tolerate 10 second latencies, then stopping and restarting a query within the same Spark application is a reasonable solution." [http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser] [1]: http://SparkMailingList So if thats better solution should i restart query as below query.processAllavaialble() query.stop() df.unpersist() val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3 or anyother source val recreateddf = inputDf.persist() query.start() when i looked into spark documentation of above methods void processAllAvailable() ///documentation says This method is intended for testing/// Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a Source prior to invocation. (i.e. getOffset must immediately reflect the addition). stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped. https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable() Please suggest a better approach to refresh the cache.