This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 53799e15c1e [SPARK-44588][CORE] Fix double encryption issue for migrated shuffle blocks 53799e15c1e is described below commit 53799e15c1e4396189449f02667a7716fab4cdef Author: Henry Mai <henry...@users.noreply.github.com> AuthorDate: Tue Aug 1 14:33:10 2023 -0700 [SPARK-44588][CORE] Fix double encryption issue for migrated shuffle blocks ### What changes were proposed in this pull request? Fix double encryption issue for migrated shuffle blocks Shuffle blocks upon migration are sent without decryption when io.encryption is enabled. The code on the receiving side ends up using serializer.wrapStream on the OutputStream to the file which results in the already encrypted bytes being encrypted again when the bytes are written out. This patch removes the usage of serializerManager.wrapStream on the receiving side and also adds tests that validate that this works as expected. I have also validated that the added tests will fail if the fix is not in place. Jira ticket with more details: https://issues.apache.org/jira/browse/SPARK-44588 ### Why are the changes needed? Migrated shuffle blocks will be double encrypted when `spark.io.encryption = true` without this fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests were added to test shuffle block migration with spark.io.encryption enabled and also fixes a test helper method to properly construct the SerializerManager with the encryption key. Closes #42214 from henrymai/fix_shuffle_migration_double_encryption. Authored-by: Henry Mai <henry...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 6f6dd38ce542bf9876b849f85c01f47ee1ec93ca) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/shuffle/IndexShuffleBlockResolver.scala | 8 ++++--- .../apache/spark/storage/BlockManagerSuite.scala | 28 ++++++++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 08dec2e4dd3..919b0f5f7c1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -242,9 +242,11 @@ private[spark] class IndexShuffleBlockResolver( s"${blockId.getClass().getSimpleName()}", category = "SHUFFLE") } val fileTmp = createTempFile(file) - val channel = Channels.newChannel( - serializerManager.wrapStream(blockId, - new FileOutputStream(fileTmp))) + + // Shuffle blocks' file bytes are being sent directly over the wire, so there is no need to + // serializerManager.wrapStream() on it. Meaning if it was originally encrypted, then + // it will stay encrypted when being written out to the file here. + val channel = Channels.newChannel(new FileOutputStream(fileTmp)) new StreamCallbackWithID { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ab6c2693b0e..ecd66dc2c5f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -131,7 +131,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe None } val bmSecurityMgr = new SecurityManager(bmConf, encryptionKey) - val serializerManager = new SerializerManager(serializer, bmConf) + val serializerManager = new SerializerManager(serializer, bmConf, encryptionKey) val transfer = transferService.getOrElse(new NettyBlockTransferService( conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)) val memManager = UnifiedMemoryManager(bmConf, numCores = 1) @@ -2033,10 +2033,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) } - private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], willReject: Boolean) = { + private def testShuffleBlockDecommissioning( + maxShuffleSize: Option[Int], willReject: Boolean, enableIoEncryption: Boolean) = { maxShuffleSize.foreach{ size => conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b") } + conf.set(IO_ENCRYPTION_ENABLED, enableIoEncryption) + val shuffleManager1 = makeSortShuffleManager(Some(conf)) val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1) shuffleManager1.shuffleBlockResolver._blockManager = bm1 @@ -2095,15 +2098,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe } test("test migration of shuffle blocks during decommissioning - no limit") { - testShuffleBlockDecommissioning(None, true) + testShuffleBlockDecommissioning(None, true, false) + } + + test("test migration of shuffle blocks during decommissioning - no limit - " + + "io.encryption enabled") { + testShuffleBlockDecommissioning(None, true, true) } test("test migration of shuffle blocks during decommissioning - larger limit") { - testShuffleBlockDecommissioning(Some(10000), true) + testShuffleBlockDecommissioning(Some(10000), true, false) + } + + test("test migration of shuffle blocks during decommissioning - larger limit - " + + "io.encryption enabled") { + testShuffleBlockDecommissioning(Some(10000), true, true) } test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit") { - testShuffleBlockDecommissioning(Some(1), false) + testShuffleBlockDecommissioning(Some(1), false, false) + } + + test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit -" + + " io.encryption enabled") { + testShuffleBlockDecommissioning(Some(1), false, true) } test("SPARK-32919: Shuffle push merger locations should be bounded with in" + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org