Dong Wang created SPARK-29872: --------------------------------- Summary: Improper cache strategy in examples Key: SPARK-29872 URL: https://issues.apache.org/jira/browse/SPARK-29872 Project: Spark Issue Type: Improvement Components: Examples Affects Versions: 3.0.0 Reporter: Dong Wang
1. Improper cache in examples.SparkTC The RDD edges should be cached because it is used multiple times in while loop. And it should be unpersisted before the last action tc.count(), because tc has been persisted. On the other hand, many tc objects is cached in while loop but never uncached, which will waste memory. {code:scala} val edges = tc.map(x => (x._2, x._1)) // Edges should be cached // This join is iterated until a fixed point is reached. var oldCount = 0L var nextCount = tc.count() do { oldCount = nextCount // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache() nextCount = tc.count() } while (nextCount != oldCount) println(s"TC has ${tc.count()} edges.") {code} 2. Cache needed in examples.ml.LogisticRegressionSummary The DataFrame fMeasure should be cached. {code:scala} // Set the model threshold to maximize F-Measure val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be cached val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0) val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure) .select("threshold").head().getDouble(0) lrModel.setThreshold(bestThreshold) {code} 3. Cache needed in examples.sql.SparkSQLExample {code:scala} val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // This RDD should be cahced .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") teenagersDF.map(teenager => "Name: " + teenager(0)).show() teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() {code} This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org