otterc commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r658947849
########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -292,12 +325,59 @@ void deleteExecutorDirs(Path[] dirs) { } } + /** + * Create StreamCallback for invalid push blocks with the specific error message. + * If specific error message is null, this StreamCallback won't throw exception in client. + */ + private StreamCallbackWithID createCallbackForInvalidPushBlocks( + String streamId, + String errorMessage) { + return new StreamCallbackWithID() { + @Override + public String getID() { + return streamId; + } + + @Override + public void onData(String streamId, ByteBuffer buf) { + // Ignore the requests. It reaches here either when a request is received after the + // shuffle file is finalized or when a request is for a duplicate block. + } + + @Override + public void onComplete(String streamId) { + if (errorMessage != null) { + // Throw an exception here so the block data is drained from channel and server + // responds RpcFailure to the client. + throw new RuntimeException(String.format("Block %s %s", streamId, errorMessage)); + } + // For duplicate block that is received before the shuffle merge finalizes, the + // server should respond success to the client. + } + + @Override + public void onFailure(String streamId, Throwable cause) { + } + }; + } + + @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + final String streamId = String.format("%s_%d_%d_%d", + OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, msg.mapIndex, + msg.reduceId); + if (appShuffleInfo.attemptId != msg.attemptId) { + // If this Block belongs to a former application attempt, it is considered late, + // as only the blocks from the current application attempt will be merged + // TODO: [SPARK-35548] Client should be updated to handle this error. + return createCallbackForInvalidPushBlocks(streamId, Review comment: You don't need to return a streamCallback for this. You can just throw RuntimeException here. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -338,51 +418,30 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck; - final String streamId = String.format("%s_%d_%d_%d", - OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex, - msg.reduceId); if (partitionInfo != null) { return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex); } else { // For a duplicate block or a block which is late, respond back with a callback that handles // them differently. - return new StreamCallbackWithID() { - @Override - public String getID() { - return streamId; - } - - @Override - public void onData(String streamId, ByteBuffer buf) { - // Ignore the requests. It reaches here either when a request is received after the - // shuffle file is finalized or when a request is for a duplicate block. - } - - @Override - public void onComplete(String streamId) { - if (isTooLate) { - // Throw an exception here so the block data is drained from channel and server - // responds RpcFailure to the client. - throw new RuntimeException(String.format("Block %s %s", streamId, - ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)); - } - // For duplicate block that is received before the shuffle merge finalizes, the - // server should respond success to the client. - } - - @Override - public void onFailure(String streamId, Throwable cause) { - } - }; + if (isTooLate) { + // Throw RuntimeException in client as of block arrives too late + return createCallbackForInvalidPushBlocks( Review comment: This change is not necessary. When `appShuffleInfo.attemptId != msg.attemptId)`, then just throw a Runtime exception instead of creating stream callbck -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org