pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       ...
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) 
{
           synchronized (partition) {
             try {
               // This can throw IOException which will marks this shuffle 
partition as not merged.
               partition.finalizePartition();
               bitmaps.add(partition.mapTracker);
               reduceIds.add(partition.reduceId);
               sizes.add(partition.getLastChunkOffset());
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {}_{} 
{} {}", msg.appId,
                 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFilesAndDeleteIfNeeded(false);
             }
           }
   +       assert partition.dataFile.length() == partition.lastChunkOffset;
   +       assert partition.indexFile.file.length() == 
partition.indexFile.getPos();
   +       assert partition.metaFile.file.length() == 
partition.metaFile.getPos();
   +       logger.info("shuffle partition {}_{} {} {}, chunk_size={}, 
meta_length={}, data_length={}",
   +              msg.appId, msg.appAttemptId, msg.shuffleId, 
partition.reduceId,
   +              partition.indexFile.getPos() / 8 - 1,
   +              partition.metaFile.getPos(),
   +              partition.lastChunkOffset);
        }
         mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
           bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
           Longs.toArray(sizes));
       }
       ...
   }
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
       blockId: ShuffleMergedBlockId,
       dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
     val indexFile =
       getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.shuffleMergeId,
         blockId.reduceId, dirs)
     val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
     val size = indexFile.length.toInt
     val offsets = Utils.tryWithResource {
       new DataInputStream(Files.newInputStream(indexFile.toPath))
     } { dis =>
       val buffer = ByteBuffer.allocate(size)
       dis.readFully(buffer.array)
       buffer.asLongBuffer
     }
     // Number of chunks is number of indexes - 1
     val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = 
java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = 
java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = 
java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +      s"index_file is $indexFile, backup to $indexBackupPath" +
   +      s"data_file is $dataFile, backup to $dataBackupPath" +
   +      s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
     }
     for (index <- 0 until numChunks) yield {
       new FileSegmentManagedBuffer(transportConf, dataFile,
         offsets.get(index),
         offsets.get(index + 1) - offsets.get(index))
     }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   Assertion failed in reduce task side.
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: 
ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job 
aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most 
recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 
executor 562): java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:208)
        at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
        at 
org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
        at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   ```
   
   ESS logs
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle 
partition application_1640143179334_0148_-1 126 4877, chunk_size=1, 
meta_length=18, data_length=157
   ```
   
   Reduce task backup merged shuffle files 
   ```
   root@beta-spark4:/tmp# ls -l 
shuffleMerged_application_1640143179334_0148_126_0_4877*
   -rw-r--r-- 1 root root 16036 Jan  7 19:41 
shuffleMerged_application_1640143179334_0148_126_0_4877.data
   -rw-r--r-- 1 root root     8 Jan  7 19:41 
shuffleMerged_application_1640143179334_0148_126_0_4877.index
   -rw-r--r-- 1 root root     0 Jan  7 19:41 
shuffleMerged_application_1640143179334_0148_126_0_4877.meta
   ```
   
   So, the ESS and reduce task running on same machine, and ESS closed the 
'data', 'index', 'meta' files and reported the there size as `chunk_size=1, 
meta_length=18, data_length=157`, these metadata also return to driver and pass 
to reduce task, but when reduce task read the file from disk, the data is not 
match!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to