This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6dd38ce54 [SPARK-44588][CORE] Fix double encryption issue for 
migrated shuffle blocks
6f6dd38ce54 is described below

commit 6f6dd38ce542bf9876b849f85c01f47ee1ec93ca
Author: Henry Mai <henry...@users.noreply.github.com>
AuthorDate: Tue Aug 1 14:33:10 2023 -0700

    [SPARK-44588][CORE] Fix double encryption issue for migrated shuffle blocks
    
    ### What changes were proposed in this pull request?
    
    Fix double encryption issue for migrated shuffle blocks
    
    Shuffle blocks upon migration are sent without decryption when 
io.encryption is enabled. The code on the receiving side ends up using 
serializer.wrapStream on the OutputStream to the file which results in the 
already encrypted bytes being encrypted again when the bytes are written out.
    
    This patch removes the usage of serializerManager.wrapStream on the 
receiving side and also adds tests that validate that this works as expected. I 
have also validated that the added tests will fail if the fix is not in place.
    
    Jira ticket with more details: 
https://issues.apache.org/jira/browse/SPARK-44588
    
    ### Why are the changes needed?
    
    Migrated shuffle blocks will be double encrypted when `spark.io.encryption 
= true` without this fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests were added to test shuffle block migration with 
spark.io.encryption enabled and also fixes a test helper method to properly 
construct the SerializerManager with the encryption key.
    
    Closes #42214 from henrymai/fix_shuffle_migration_double_encryption.
    
    Authored-by: Henry Mai <henry...@users.noreply.github.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  8 ++++---
 .../apache/spark/storage/BlockManagerSuite.scala   | 28 ++++++++++++++++++----
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 08dec2e4dd3..919b0f5f7c1 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -242,9 +242,11 @@ private[spark] class IndexShuffleBlockResolver(
           s"${blockId.getClass().getSimpleName()}", category = "SHUFFLE")
     }
     val fileTmp = createTempFile(file)
-    val channel = Channels.newChannel(
-      serializerManager.wrapStream(blockId,
-        new FileOutputStream(fileTmp)))
+
+    // Shuffle blocks' file bytes are being sent directly over the wire, so 
there is no need to
+    // serializerManager.wrapStream() on it. Meaning if it was originally 
encrypted, then
+    // it will stay encrypted when being written out to the file here.
+    val channel = Channels.newChannel(new FileOutputStream(fileTmp))
 
     new StreamCallbackWithID {
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index ab6c2693b0e..ecd66dc2c5f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -131,7 +131,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with PrivateMethodTe
       None
     }
     val bmSecurityMgr = new SecurityManager(bmConf, encryptionKey)
-    val serializerManager = new SerializerManager(serializer, bmConf)
+    val serializerManager = new SerializerManager(serializer, bmConf, 
encryptionKey)
     val transfer = transferService.getOrElse(new NettyBlockTransferService(
       conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1))
     val memManager = UnifiedMemoryManager(bmConf, numCores = 1)
@@ -2033,10 +2033,13 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
   }
 
-  private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], 
willReject: Boolean) = {
+  private def testShuffleBlockDecommissioning(
+      maxShuffleSize: Option[Int], willReject: Boolean, enableIoEncryption: 
Boolean) = {
     maxShuffleSize.foreach{ size =>
       conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b")
     }
+    conf.set(IO_ENCRYPTION_ENABLED, enableIoEncryption)
+
     val shuffleManager1 = makeSortShuffleManager(Some(conf))
     val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1)
     shuffleManager1.shuffleBlockResolver._blockManager = bm1
@@ -2095,15 +2098,30 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
   }
 
   test("test migration of shuffle blocks during decommissioning - no limit") {
-    testShuffleBlockDecommissioning(None, true)
+    testShuffleBlockDecommissioning(None, true, false)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - no limit - " 
+
+      "io.encryption enabled") {
+    testShuffleBlockDecommissioning(None, true, true)
   }
 
   test("test migration of shuffle blocks during decommissioning - larger 
limit") {
-    testShuffleBlockDecommissioning(Some(10000), true)
+    testShuffleBlockDecommissioning(Some(10000), true, false)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - larger limit 
- " +
+      "io.encryption enabled") {
+    testShuffleBlockDecommissioning(Some(10000), true, true)
   }
 
   test("[SPARK-34363]test migration of shuffle blocks during decommissioning - 
small limit") {
-    testShuffleBlockDecommissioning(Some(1), false)
+    testShuffleBlockDecommissioning(Some(1), false, false)
+  }
+
+  test("[SPARK-34363]test migration of shuffle blocks during decommissioning - 
small limit -" +
+      " io.encryption enabled") {
+    testShuffleBlockDecommissioning(Some(1), false, true)
   }
 
   test("SPARK-32919: Shuffle push merger locations should be bounded with in" +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to