otterc edited a comment on pull request #31934:
URL: https://github.com/apache/spark/pull/31934#issuecomment-805061280


   > hmm..I try to understand how those 2 scenarios cause the merged block 
corrupted.
   > 
   > 1. Do you mean called `StreamCallback.onFailure()` for 2 times cause the 
block corrupted?Seems like the thing `onFailure` does is only to 
`setCurrentMapIndex(-1)` and `setEncounteredFailure(true)`. And they don't 
touch files, e.g., reset position or truncate.
   > 2. I can see how the duplicate stream may interfere with an active stream. 
e.g., the active stream may see `getCurrentMapIndex` < 0 and 
`isEncounteredFailure=true` while writing normally itself. But it seems like 
the active stream is able to heal itself with the current framework.
   > 
   > I properly missed some details. Could you elaborate more about how 
corruption happens? Thanks.
   
   In both the scenarios, the `currentMapId` of the shuffle partition is 
modified to -1 which can interfere with an active stream (stream that is 
writing). By interfering, I mean it gives a chance to another stream which is 
waiting to merge to same shuffle partition to start writing without the active 
stream completing successfully or with failure.
   
    Providing examples for both of these:
   1. When on `onFailure` is called twice
   - Say stream1 merging `shufflePush_0_1_2` wrote some data and has 
`isWriting=true`. Now it failed, so it sets `currentMapId` of partition_0_2 to 
`-1`. 
   - Another stream2 which wants to merge `shufflePush_0_2_2` can now start 
merging its bufs to partition_0_2 and it sets `currentMapId` of partition_0_2 
to `2`. 
   - Another stream3 which wants to merge `shufflePush_0_3_2` will defer its 
buffers because `stream2` is the active one right now (`currentMapId` is 2).
   - stream2 has only merged few bufs, but then `stream1.onFailure()` is 
invoked again and that will change the `currentMapId` of partiton_0_2 to `-1`. 
This becomes a problem because `stream2` hasn't completed successfully (or with 
failure) and now `stream3` is `allowedToWrite`. If `stream3` starts writing 
buffers when `stream2` has not appended all its buffers, then the data of 
`shufflePush_0_2_2` will be corrupted.
   
   2. Duplicate stream. 
   - Say stream1 merging `shufflePush_0_1_2` wrote some data and has 
`isWriting=true`. It completed successfully and then sets `currentMapId` of 
partition_0_2 to `-1`. 
   - Now `stream1duplicate` which is also trying to merge `shufflePush_0_1_2` 
will be `allowedToWrite` because the `currentMapId` of partition_0_2 is `-1` 
and it sets `isWriting=true`. However,  we identify that it is a duplication 
stream and just return without modifying `currentMapId`.
   - stream2 which tries to merge `shufflePush_0_2_2` will be `allowedToWrite` 
because `currentMapId=-1`. It sets `currentMapId=2` and start writing.
   - If `stream1Duplicate` encounters a failure now, it has `isWriting` on and 
so can reset `currentMapId` of partition_0_2. This again gives a chance to 
another stream say stream3 to `allowedToWrite` without stream2 to complete.
   
   I have added UTs for both these cases as well with similar examples.
   @Ngone51 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to