Dong Wang created SPARK-29872:
---------------------------------

             Summary: Improper cache strategy in examples
                 Key: SPARK-29872
                 URL: https://issues.apache.org/jira/browse/SPARK-29872
             Project: Spark
          Issue Type: Improvement
          Components: Examples
    Affects Versions: 3.0.0
            Reporter: Dong Wang


1. Improper cache in examples.SparkTC
The RDD edges should be cached because it is used multiple times in while loop. 
And it should be unpersisted before the last action tc.count(), because tc has 
been persisted.
On the other hand, many tc objects is cached in while loop but never uncached, 
which will waste memory.
{code:scala}
    val edges = tc.map(x => (x._2, x._1)) // Edges should be cached
    // This join is iterated until a fixed point is reached.
    var oldCount = 0L
    var nextCount = tc.count()
    do { 
      oldCount = nextCount
      // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
      // then project the result to obtain the new (x, z) paths.
      tc = tc.union(tc.join(edges).map(x => (x._2._2, 
x._2._1))).distinct().cache()
      nextCount = tc.count()
    } while (nextCount != oldCount)
    println(s"TC has ${tc.count()} edges.")
{code}

2. Cache needed in examples.ml.LogisticRegressionSummary
The DataFrame fMeasure should be cached.
{code:scala}
    // Set the model threshold to maximize F-Measure
    val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be 
cached
    val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
    val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
      .select("threshold").head().getDouble(0)
    lrModel.setThreshold(bestThreshold)
{code}

3. Cache needed in examples.sql.SparkSQLExample

{code:scala}
    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // 
This RDD should be cahced
      .toDF()
    // Register the DataFrame as a temporary view
    peopleDF.createOrReplaceTempView("people")
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 
13 AND 19")
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    teenagersDF.map(teenager => "Name: " + 
teenager.getAs[String]("name")).show()
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, 
Any]]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", 
"age"))).collect()
{code}

This issue is reported by our tool CacheCheck, which is used to dynamically 
detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to