[ https://issues.apache.org/jira/browse/SPARK-29823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Wang updated SPARK-29823: ------------------------------ Description: In mllib.clustering.KMeans.run(), the rdd {color:#de350b}_norms_{color} is persisted. But {color:#de350b}_norms_ {color}only has a single child, i.e., the rdd {color:#de350b}_zippedData_ {color:#172b4d}which was not persisted{color}{color}{color:#172b4d}.{color} So all the actions that reply on _norms_ also reply on _{color:#de350b}zippedData{color}._ The rdd {color:#de350b}_zippedData_{color} will be used by multiple times in _runAlgorithm()._ Therefore _{color:#de350b}zippedData{color}_ should be persisted, not _{color:#de350b}norms{color}_. {code:scala} private[spark] def run( data: RDD[Vector], instr: Option[Instrumentation]): KMeansModel = { if (data.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Compute squared norms and cache them. val norms = data.map(Vectors.norm(_, 2.0)) norms.persist() // Unnecessary persist. Only used to generate zippedData. val zippedData = data.zip(norms).map { case (v, norm) => new VectorWithNorm(v, norm) } // needs to persist val model = runAlgorithm(zippedData, instr) norms.unpersist() // Change to zippedData.unpersist() {code} This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses. was: In mllib.clustering.KMeans.run(), the rdd {color:#de350b}_norms_{color} is persisted. But {color:#de350b}_norms_ {color}only has a single child, i.e., the rdd {color:#de350b}_zippedData_ {color:#172b4d}which was not persisted{color}{color}{color:#172b4d}. S{color}o all the actions that reply on _norms_ also reply on _{color:#de350b}zippedData{color}._ The rdd {color:#de350b}_zippedData_{color} will be used by multiple times in _runAlgorithm()._ Therefore _{color:#de350b}zippedData{color}_ should be persisted, not __ _{color:#de350b}norms{color}._ {code:scala} private[spark] def run( data: RDD[Vector], instr: Option[Instrumentation]): KMeansModel = { if (data.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Compute squared norms and cache them. val norms = data.map(Vectors.norm(_, 2.0)) norms.persist() // Unnecessary persist. Only used to generate zippedData. val zippedData = data.zip(norms).map { case (v, norm) => new VectorWithNorm(v, norm) } // needs to persist val model = runAlgorithm(zippedData, instr) norms.unpersist() // Change to zippedData.unpersist() {code} This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses. > Improper persist strategy in mllib.clustering.KMeans.run() > ---------------------------------------------------------- > > Key: SPARK-29823 > URL: https://issues.apache.org/jira/browse/SPARK-29823 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 2.4.3 > Reporter: Dong Wang > Priority: Major > > In mllib.clustering.KMeans.run(), the rdd {color:#de350b}_norms_{color} is > persisted. But {color:#de350b}_norms_ {color}only has a single child, i.e., > the rdd {color:#de350b}_zippedData_ {color:#172b4d}which was not > persisted{color}{color}{color:#172b4d}.{color} So all the actions that reply > on _norms_ also reply on _{color:#de350b}zippedData{color}._ The rdd > {color:#de350b}_zippedData_{color} will be used by multiple times in > _runAlgorithm()._ Therefore _{color:#de350b}zippedData{color}_ should be > persisted, not _{color:#de350b}norms{color}_. > {code:scala} > private[spark] def run( > data: RDD[Vector], > instr: Option[Instrumentation]): KMeansModel = { > if (data.getStorageLevel == StorageLevel.NONE) { > logWarning("The input data is not directly cached, which may hurt > performance if its" > + " parent RDDs are also uncached.") > } > // Compute squared norms and cache them. > val norms = data.map(Vectors.norm(_, 2.0)) > norms.persist() // Unnecessary persist. Only used to generate zippedData. > val zippedData = data.zip(norms).map { case (v, norm) => > new VectorWithNorm(v, norm) > } // needs to persist > val model = runAlgorithm(zippedData, instr) > norms.unpersist() // Change to zippedData.unpersist() > {code} > This issue is reported by our tool CacheCheck, which is used to dynamically > detecting persist()/unpersist() api misuses. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org