[ https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
holdenk updated SPARK-28781: ---------------------------- Issue Type: Improvement (was: Bug) > Unneccesary persist in PeriodicCheckpointer.update() > ---------------------------------------------------- > > Key: SPARK-28781 > URL: https://issues.apache.org/jira/browse/SPARK-28781 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 2.4.3 > Reporter: Dong Wang > Priority: Major > > Once the update is called, newData is persisted at line 82. However, only > when the checkpoint is handling (satisfy the condition at line 94), the > persist data is used for the second time (do checkpoint at line 97). The > other data which is not satisfied to the checkpoint condition is unnecessary > to be cached. The persistedQueue avoids too many unnecessary cached data, but > it is best to avoid every unnecessary persist operation. > {code:scala} > def update(newData: T): Unit = { > persist(newData) > persistedQueue.enqueue(newData) > // We try to maintain 2 Datasets in persistedQueue to support the > semantics of this class: > // Users should call [[update()]] when a new Dataset has been created, > // before the Dataset has been materialized. > while (persistedQueue.size > 3) { > val dataToUnpersist = persistedQueue.dequeue() > unpersist(dataToUnpersist) > } > updateCount += 1 > // Handle checkpointing (after persisting) > if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 > && sc.getCheckpointDir.nonEmpty) { > // Add new checkpoint before removing old checkpoints. > checkpoint(newData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org