[ https://issues.apache.org/jira/browse/SPARK-44588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Henry Mai updated SPARK-44588: ------------------------------ Description: Shuffle blocks upon migration are wrapped for encryption again when being written out to a file on the receiver side. Pull request to fix this: https://github.com/apache/spark/pull/42214 Details: Sender/Read side: BlockManagerDecommissioner:run() blocks = bm.migratableResolver.getMigrationBlocks() *dataFile = IndexShuffleBlockResolver:getDataFile(...)* buffer = FileSegmentManagedBuffer(..., dataFile) *^ This reads straight from disk without decryption* blocks.foreach((blockId, buffer) => bm.blockTransferService.uploadBlockSync(..., buffer, ...)) -> uploadBlockSync() -> uploadBlock(..., buffer, ...) -> client.uploadStream(UploadBlockStream, buffer, ...) - Notice that there is no decryption here on the sender/read side. Receiver/Write side: NettyBlockRpcServer:receiveStream() <--- This is the UploadBlockStream handler putBlockDataAsStream() migratableResolver.putShuffleBlockAsStream() *-> file = IndexShuffleBlockResolver:getDataFile(...)* -> tmpFile = (file + .<uuid> extension) *-> Creates an encrypting writable channel to a tmpFile using serializerManager.wrapStream()* -> onData() writes the data into the channel -> onComplete() renames the tmpFile to the file - Notice: * Both getMigrationBlocks()[read] and putShuffleBlockAsStream()[write] target IndexShuffleBlockResolver:getDataFile() * The read path does not decrypt but the write path encrypts. * As a thought exercise: if this cycle happens more than once (where this receiver is now a sender) even if we assume that the shuffle blocks are initially unencrypted*, then bytes in the file will just have more and more layers of encryption applied to it each time it gets migrated. * *In practice, the shuffle blocks are encrypted on disk to begin with, this is just a thought exercise was: Shuffle blocks upon migration are wrapped for encryption again when being written out to a file on the receiver side. I have a fix ready but posting this Jira ticket first. Details: Sender/Read side: BlockManagerDecommissioner:run() blocks = bm.migratableResolver.getMigrationBlocks() *dataFile = IndexShuffleBlockResolver:getDataFile(...)* buffer = FileSegmentManagedBuffer(..., dataFile) *^ This reads straight from disk without decryption* blocks.foreach((blockId, buffer) => bm.blockTransferService.uploadBlockSync(..., buffer, ...)) -> uploadBlockSync() -> uploadBlock(..., buffer, ...) -> client.uploadStream(UploadBlockStream, buffer, ...) - Notice that there is no decryption here on the sender/read side. Receiver/Write side: NettyBlockRpcServer:receiveStream() <--- This is the UploadBlockStream handler putBlockDataAsStream() migratableResolver.putShuffleBlockAsStream() *-> file = IndexShuffleBlockResolver:getDataFile(...)* -> tmpFile = (file + .<uuid> extension) *-> Creates an encrypting writable channel to a tmpFile using serializerManager.wrapStream()* -> onData() writes the data into the channel -> onComplete() renames the tmpFile to the file - Notice: * Both getMigrationBlocks()[read] and putShuffleBlockAsStream()[write] target IndexShuffleBlockResolver:getDataFile() * The read path does not decrypt but the write path encrypts. * As a thought exercise: if this cycle happens more than once (where this receiver is now a sender) even if we assume that the shuffle blocks are initially unencrypted*, then bytes in the file will just have more and more layers of encryption applied to it each time it gets migrated. * *In practice, the shuffle blocks are encrypted on disk to begin with, this is just a thought exercise > Migrated shuffle blocks when io.encryption is enabled are encrypted multiple > times > ---------------------------------------------------------------------------------- > > Key: SPARK-44588 > URL: https://issues.apache.org/jira/browse/SPARK-44588 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.1.3, 3.2.1, 3.3.0, 3.2.2, > 3.3.1, 3.2.3, 3.2.4, 3.3.2, 3.4.0, 3.4.1 > Reporter: Henry Mai > Priority: Critical > > Shuffle blocks upon migration are wrapped for encryption again when being > written out to a file on the receiver side. > > Pull request to fix this: https://github.com/apache/spark/pull/42214 > > Details: > Sender/Read side: > BlockManagerDecommissioner:run() > blocks = bm.migratableResolver.getMigrationBlocks() > *dataFile = IndexShuffleBlockResolver:getDataFile(...)* > buffer = FileSegmentManagedBuffer(..., dataFile) > *^ This reads straight from disk without decryption* > blocks.foreach((blockId, buffer) => > bm.blockTransferService.uploadBlockSync(..., buffer, ...)) > -> uploadBlockSync() -> uploadBlock(..., buffer, ...) > -> client.uploadStream(UploadBlockStream, buffer, ...) > - Notice that there is no decryption here on the sender/read side. > Receiver/Write side: > NettyBlockRpcServer:receiveStream() <--- This is the UploadBlockStream handler > putBlockDataAsStream() > migratableResolver.putShuffleBlockAsStream() > *-> file = IndexShuffleBlockResolver:getDataFile(...)* > -> tmpFile = (file + .<uuid> extension) > *-> Creates an encrypting writable channel to a tmpFile using > serializerManager.wrapStream()* > -> onData() writes the data into the channel > -> onComplete() renames the tmpFile to the file > - Notice: > * Both getMigrationBlocks()[read] and putShuffleBlockAsStream()[write] > target IndexShuffleBlockResolver:getDataFile() > * The read path does not decrypt but the write path encrypts. > * As a thought exercise: if this cycle happens more than once (where this > receiver is now a sender) even if we assume that the shuffle blocks are > initially unencrypted*, then bytes in the file will just have more and more > layers of encryption applied to it each time it gets migrated. > * *In practice, the shuffle blocks are encrypted on disk to begin with, this > is just a thought exercise -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org