Github user liyezhang556520 commented on the pull request:

    https://github.com/apache/spark/pull/2134#issuecomment-55356380
  
    Hi @andrewor14 , you are really right about this, which is also my concern, 
and I tried to make the risk to the least. Allow me to tell a story here:
    One reason I removed configuration of **spark.storage.unrollFraction** is 
that the **unrollFraction** is set to be a fixed value, however, in some 
workload, assume there are many iteration of the application, and each 
iteration has blocks to be cached, for each iteration, the required cache 
memory for new blocks may various, and this has some side-effect, for some 
iteration too many old blocks dropped leading to  time wasting, and for some 
iteration not enough old blocks dropped which will lead to the new blocks 
dropped to disk or else while there are still many old blocks can be dropped 
for new blocks for caching. And also, when there is not so many old blocks that 
can be dropped (the memory of old blocks can be dropped is less than `maxMemory 
* unrollFraction`), the `ensureFreeSpace` will always return false. so it's 
hard for user to decide the value of `unrollFraction`. The other reason is for 
easy implementation of dropping old blocks in parallel.
    
    For OOM problem, it's really hard to avoid, since there are two places have 
the risk in this patch and one of the two also exists in the original 
implementation. 
    1. when we process blocks in `unrollSafely`, we will go through the 
`iterator` and to see if the new block partitions can be put into memory, and 
the checkperiod is `memoryCheckPeriod`, default value 16. Since we have no idea 
what is the memory value is required for each iteration, and this process is in 
parallel with many threads, the memory has occupied by the new block partitions 
for the first round check might be already very huge. This might cause OOM when 
the memory is already around the edge of it's capacity. This situations exists 
in both this patch and origin implementation.
    2. The second place is where you pointed out. Yes, in this patch, We lazy 
drop the old blocks when new blocks are to unroll in `unrollSafely`. In my 
implementation, for each check period, if old blocks need to drop, then only 
the least number of old blocks will be marked to be dropped for the current 
thread, just satisfy the required value of the new block partition. And then 
dropped those marked old blocks to disk, and continue going through the 
iteration for next checkpoint. Since only the least number of blocks will be 
dropped, which will make the difference of the tobedropped memory and 
tobeunroll memory to the least. And only the difference value will have effects 
the `freeMemoryForUnroll`, which will have effect to other threads unrolling 
process.
    
    There are two phases need to drop blocks in the whole procedure, one is 
`unrollSafely`, and the other is `tryToPut`, there will no OOM risk for 
`tryToPut` since all data when calling `tryToPut` has been already in  memory.
    
    Fortunately there is `spark.storage.safetyFraction` to lower the risk 
deeper, but the OOM risk will still exists I think.
    
    Another way is just drop the new blocks to disk when there is not enough 
free memory, which will not dropping old blocks at all, and in this way can 
also gain a lot performance speedup compared with dropping old block in serial. 
And performance is very close to dropping old blocks in parallel in our test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to