[ 
https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Wang updated SPARK-28781:
------------------------------
    Description: 
Once the fuction _update()_ is called, the RDD _newData_ is persisted at line 
82. However, only when meeting the checking point condition (at line 94), the 
persisted rdd _newData_ would be used for the second time in the API 
_checkpoint()_ (do checkpoint at line 97). In other conditions, _newData_ will 
only be used once and it is unnecessary to persist the rdd in that case. 
Although the persistedQueue will be checked to avoid too many unnecessary 
cached data, it would be better to avoid every unnecessary persist operation.
{code:scala}
def update(newData: T): Unit = {
    persist(newData)
    persistedQueue.enqueue(newData)
    // We try to maintain 2 Datasets in persistedQueue to support the semantics 
of this class:
    // Users should call [[update()]] when a new Dataset has been created,
    // before the Dataset has been materialized.
    while (persistedQueue.size > 3) {
      val dataToUnpersist = persistedQueue.dequeue()
      unpersist(dataToUnpersist)
    }
    updateCount += 1

    // Handle checkpointing (after persisting)
    if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
      && sc.getCheckpointDir.nonEmpty) {
      // Add new checkpoint before removing old checkpoints.
      checkpoint(newData)
{code}

  was:
Once the update is called, newData is persisted at line 82. However, only when 
the checkpoint is handling (satisfy the condition at line 94), the persist data 
is used for the second time (do checkpoint at line 97). The other data which is 
not satisfied to the checkpoint condition is unnecessary to be cached. The 
persistedQueue avoids too many unnecessary cached data, but it is best to avoid 
every unnecessary persist operation.
{code:scala}
def update(newData: T): Unit = {
    persist(newData)
    persistedQueue.enqueue(newData)
    // We try to maintain 2 Datasets in persistedQueue to support the semantics 
of this class:
    // Users should call [[update()]] when a new Dataset has been created,
    // before the Dataset has been materialized.
    while (persistedQueue.size > 3) {
      val dataToUnpersist = persistedQueue.dequeue()
      unpersist(dataToUnpersist)
    }
    updateCount += 1

    // Handle checkpointing (after persisting)
    if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
      && sc.getCheckpointDir.nonEmpty) {
      // Add new checkpoint before removing old checkpoints.
      checkpoint(newData)
{code}


> Unneccesary persist in PeriodicCheckpointer.update()
> ----------------------------------------------------
>
>                 Key: SPARK-28781
>                 URL: https://issues.apache.org/jira/browse/SPARK-28781
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Dong Wang
>            Priority: Major
>
> Once the fuction _update()_ is called, the RDD _newData_ is persisted at line 
> 82. However, only when meeting the checking point condition (at line 94), the 
> persisted rdd _newData_ would be used for the second time in the API 
> _checkpoint()_ (do checkpoint at line 97). In other conditions, _newData_ 
> will only be used once and it is unnecessary to persist the rdd in that case. 
> Although the persistedQueue will be checked to avoid too many unnecessary 
> cached data, it would be better to avoid every unnecessary persist operation.
> {code:scala}
> def update(newData: T): Unit = {
>     persist(newData)
>     persistedQueue.enqueue(newData)
>     // We try to maintain 2 Datasets in persistedQueue to support the 
> semantics of this class:
>     // Users should call [[update()]] when a new Dataset has been created,
>     // before the Dataset has been materialized.
>     while (persistedQueue.size > 3) {
>       val dataToUnpersist = persistedQueue.dequeue()
>       unpersist(dataToUnpersist)
>     }
>     updateCount += 1
>     // Handle checkpointing (after persisting)
>     if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>       && sc.getCheckpointDir.nonEmpty) {
>       // Add new checkpoint before removing old checkpoints.
>       checkpoint(newData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to