otterc commented on a change in pull request #30433: URL: https://github.com/apache/spark/pull/30433#discussion_r528229428
########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -827,13 +833,16 @@ void resetChunkTracker() { void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { long idxStartPos = -1; try { - // update the chunk tracker to meta file before index file - writeChunkTracker(mapIndex); idxStartPos = indexFile.getFilePointer(); logger.trace("{} shuffleId {} reduceId {} updated index current {} updated {}", appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset); - indexFile.writeLong(chunkOffset); + indexFile.write(Longs.toByteArray(chunkOffset)); + // Chunk bitmap should be written to the meta file after the index file because if there are + // any exceptions during writing the offset to the index file, meta file should not be + // updated. If the update to the index file is successful but the update to meta file isn't + // then the index file position is reset in the catch clause. + writeChunkTracker(mapIndex); Review comment: This is a good point. I have overlooked IOExceptions from the seeks. Any IOException from the seek would not guarantee that the position is updated. So any more updates to the file may not overwrite the corrupt data which means that the files are corrupted. One way of handling this I think is maintain the expected length of both index and meta files in AppShufflePartitionInfo instead of relying on the FilePointer. Updates to these files would be similar to writing to the merged data file as below: ``` if (partitionInfo.isEncounteredFailure()) { long updatedPos = partitionInfo.getPosition() + length; logger.debug( "{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos); } else { length += partitionInfo.dataChannel.write(buf); } ``` Please let me know if there are any other suggestions? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org