otterc commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r528229428



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -827,13 +833,16 @@ void resetChunkTracker() {
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
       long idxStartPos = -1;
       try {
-        // update the chunk tracker to meta file before index file
-        writeChunkTracker(mapIndex);
         idxStartPos = indexFile.getFilePointer();
         logger.trace("{} shuffleId {} reduceId {} updated index current {} 
updated {}",
           appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset,
           chunkOffset);
-        indexFile.writeLong(chunkOffset);
+        indexFile.write(Longs.toByteArray(chunkOffset));
+        // Chunk bitmap should be written to the meta file after the index 
file because if there are
+        // any exceptions during writing the offset to the index file, meta 
file should not be
+        // updated. If the update to the index file is successful but the 
update to meta file isn't
+        // then the index file position is reset in the catch clause.
+        writeChunkTracker(mapIndex);

Review comment:
       This is a good point. I have overlooked IOExceptions from the seeks. Any 
IOException from the seek would not guarantee that the position is updated. So 
any more updates to the file may not overwrite the corrupt data which means 
that the files are corrupted.
   One way of handling this I think is maintain the expected length of both 
index and meta files in AppShufflePartitionInfo instead of relying on the 
FilePointer. Updates to these files would be similar to writing to the merged 
data file as below:
   
   ```
        if (partitionInfo.isEncounteredFailure()) {
             long updatedPos = partitionInfo.getPosition() + length;
             logger.debug(
               "{} shuffleId {} reduceId {} encountered failure current pos {} 
updated pos {}",
               partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
               partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos);
             length += partitionInfo.dataChannel.write(buf, updatedPos);
           } else {
             length += partitionInfo.dataChannel.write(buf);
           }
   ```
   Please let me know if there are any other suggestions?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to