Vladimir Pchelko created SPARK-18925:
----------------------------------------

             Summary: Reduce memory usage of mapWithState
                 Key: SPARK-18925
                 URL: https://issues.apache.org/jira/browse/SPARK-18925
             Project: Spark
          Issue Type: Improvement
            Reporter: Vladimir Pchelko
            Priority: Minor


With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quikly runs OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but lead to new one - we unable to process in real-time - because 
the checkpointing duration is in several times longer that batchInterval.

So I inverstigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
    val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
    if (checkpointedKeys.nonEmpty) {
      oldRDDs -= checkpointedKeys.max
    }
... (C/P of DStream clearMetadata)


Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to