[ https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Wang updated SPARK-28781: ------------------------------ Description: Once the fuction _update()_ is called, the RDD _newData_ is persisted at line 82. However, only when meeting the checking point condition (at line 94), the persisted rdd _newData_ would be used for the second time in the API _checkpoint()_ (do checkpoint at line 97). In other conditions, _newData_ will only be used once and it is unnecessary to persist the rdd in that case. Although the persistedQueue will be checked to avoid too many unnecessary cached data, it would be better to avoid every unnecessary persist operation. {code:scala} def update(newData: T): Unit = { persist(newData) persistedQueue.enqueue(newData) // We try to maintain 2 Datasets in persistedQueue to support the semantics of this class: // Users should call [[update()]] when a new Dataset has been created, // before the Dataset has been materialized. while (persistedQueue.size > 3) { val dataToUnpersist = persistedQueue.dequeue() unpersist(dataToUnpersist) } updateCount += 1 // Handle checkpointing (after persisting) if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 && sc.getCheckpointDir.nonEmpty) { // Add new checkpoint before removing old checkpoints. checkpoint(newData) {code} was: Once the update is called, newData is persisted at line 82. However, only when the checkpoint is handling (satisfy the condition at line 94), the persist data is used for the second time (do checkpoint at line 97). The other data which is not satisfied to the checkpoint condition is unnecessary to be cached. The persistedQueue avoids too many unnecessary cached data, but it is best to avoid every unnecessary persist operation. {code:scala} def update(newData: T): Unit = { persist(newData) persistedQueue.enqueue(newData) // We try to maintain 2 Datasets in persistedQueue to support the semantics of this class: // Users should call [[update()]] when a new Dataset has been created, // before the Dataset has been materialized. while (persistedQueue.size > 3) { val dataToUnpersist = persistedQueue.dequeue() unpersist(dataToUnpersist) } updateCount += 1 // Handle checkpointing (after persisting) if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 && sc.getCheckpointDir.nonEmpty) { // Add new checkpoint before removing old checkpoints. checkpoint(newData) {code} > Unneccesary persist in PeriodicCheckpointer.update() > ---------------------------------------------------- > > Key: SPARK-28781 > URL: https://issues.apache.org/jira/browse/SPARK-28781 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Dong Wang > Priority: Major > > Once the fuction _update()_ is called, the RDD _newData_ is persisted at line > 82. However, only when meeting the checking point condition (at line 94), the > persisted rdd _newData_ would be used for the second time in the API > _checkpoint()_ (do checkpoint at line 97). In other conditions, _newData_ > will only be used once and it is unnecessary to persist the rdd in that case. > Although the persistedQueue will be checked to avoid too many unnecessary > cached data, it would be better to avoid every unnecessary persist operation. > {code:scala} > def update(newData: T): Unit = { > persist(newData) > persistedQueue.enqueue(newData) > // We try to maintain 2 Datasets in persistedQueue to support the > semantics of this class: > // Users should call [[update()]] when a new Dataset has been created, > // before the Dataset has been materialized. > while (persistedQueue.size > 3) { > val dataToUnpersist = persistedQueue.dequeue() > unpersist(dataToUnpersist) > } > updateCount += 1 > // Handle checkpointing (after persisting) > if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 > && sc.getCheckpointDir.nonEmpty) { > // Add new checkpoint before removing old checkpoints. > checkpoint(newData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org