Hi TD,
I use 0.9.1. Thanks for letting me know. This issue drove me up the wall. I 
even made a method to close all that I could think of:

def stopSpark(ssc: StreamingContext) = {
    ssc.sparkContext.cleanup(500)
    ssc.sparkContext.clearFiles()
    ssc.sparkContext.clearJars()
    ssc.sparkContext.metadataCleaner.cancel()
    ssc.awaitTermination(500)
    ssc.stop(true)
  }

-A
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-22-14 9:50 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: How to turn off MetadataCleaner?

The cleaner should remain up while the sparkcontext is still active (not 
stopped). However, here it seems you are stopping the sparkContext 
("ssc.stop(true)"), the cleaner should be stopped. However, there was a bug 
earlier where some of the cleaners may not have been stopped when the context 
is stopped. What version are you using. If it is 0.9.1, I can see that the 
cleaner in 
ShuffleBlockManager<https://github.com/apache/spark/blob/v0.9.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala>
 is not stopped, so it is a bug.

TD

On Thu, May 22, 2014 at 9:24 AM, Adrian Mocanu 
<amoc...@verticalscope.com<mailto:amoc...@verticalscope.com>> wrote:
Hi
After using sparks TestSuiteBase to run some tests I’ve noticed that at the 
end, after finishing all tests the cleaner is still running and outputs the 
following perdiodically:
INFO  o.apache.spark.util.MetadataCleaner  - Ran metadata cleaner for 
SHUFFLE_BLOCK_MANAGER

I use method testOperation and I’ve changed it so that it stores the pointer to 
ssc after running setupStreams. Then using that pointer to turn things off, but 
the cleaner remains up.

How to shut down all of spark, including cleaner?

Here is how I changed testOperation method (changes in bold):

  def testOperation[U: ClassTag, V: ClassTag](
                                               input: Seq[Seq[U]],
                                               operation: DStream[U] => 
DStream[V],
                                               expectedOutput: Seq[Seq[V]],
                                               numBatches: Int,
                                               useSet: Boolean
                                               ) {
    val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
    val ssc = setupStreams[U, V](input, operation)
    val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
    verifyOutput[V](output, expectedOutput, useSet)
    ssc.awaitTermination(500)
    ssc.stop(true)
  }

-Adrian


Reply via email to