This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 9e86aed7b2a [SPARK-44588][CORE][3.3] Fix double encryption issue for 
migrated shuffle blocks
9e86aed7b2a is described below

commit 9e86aed7b2ac3f9c18346a290b9672b0d9465805
Author: Henry Mai <henry...@users.noreply.github.com>
AuthorDate: Tue Aug 1 21:07:51 2023 -0700

    [SPARK-44588][CORE][3.3] Fix double encryption issue for migrated shuffle 
blocks
    
    ### What changes were proposed in this pull request?
    
    Fix double encryption issue for migrated shuffle blocks
    
    Shuffle blocks upon migration are sent without decryption when 
io.encryption is enabled. The code on the receiving side ends up using 
serializer.wrapStream on the OutputStream to the file which results in the 
already encrypted bytes being encrypted again when the bytes are written out.
    
    This patch removes the usage of serializerManager.wrapStream on the 
receiving side and also adds tests that validate that this works as expected. I 
have also validated that the added tests will fail if the fix is not in place.
    
    Jira ticket with more details: 
https://issues.apache.org/jira/browse/SPARK-44588
    
    ### Why are the changes needed?
    
    Migrated shuffle blocks will be double encrypted when `spark.io.encryption 
= true` without this fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests were added to test shuffle block migration with 
spark.io.encryption enabled and also fixes a test helper method to properly 
construct the SerializerManager with the encryption key.
    
    Closes #42277 from henrymai/branch-3.3_backport_double_encryption.
    
    Authored-by: Henry Mai <henry...@users.noreply.github.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  8 ++++---
 .../apache/spark/storage/BlockManagerSuite.scala   | 28 ++++++++++++++++++----
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index ba54555311e..d41321b4597 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -240,9 +240,11 @@ private[spark] class IndexShuffleBlockResolver(
           s"${blockId.getClass().getSimpleName()}")
     }
     val fileTmp = createTempFile(file)
-    val channel = Channels.newChannel(
-      serializerManager.wrapStream(blockId,
-        new FileOutputStream(fileTmp)))
+
+    // Shuffle blocks' file bytes are being sent directly over the wire, so 
there is no need to
+    // serializerManager.wrapStream() on it. Meaning if it was originally 
encrypted, then
+    // it will stay encrypted when being written out to the file here.
+    val channel = Channels.newChannel(new FileOutputStream(fileTmp))
 
     new StreamCallbackWithID {
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6bfffc8ab3d..986ac79953d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -133,7 +133,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val transfer = transferService
       .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", 
"localhost", 0, 1))
     val memManager = UnifiedMemoryManager(bmConf, numCores = 1)
-    val serializerManager = new SerializerManager(serializer, bmConf)
+    val serializerManager = new SerializerManager(serializer, bmConf, 
encryptionKey)
     val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
       val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 0)
       Some(new ExternalBlockStoreClient(transConf, bmSecurityMgr,
@@ -2027,10 +2027,13 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
   }
 
-  private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], 
willReject: Boolean) = {
+  private def testShuffleBlockDecommissioning(
+      maxShuffleSize: Option[Int], willReject: Boolean, enableIoEncryption: 
Boolean) = {
     maxShuffleSize.foreach{ size =>
       conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b")
     }
+    conf.set(IO_ENCRYPTION_ENABLED, enableIoEncryption)
+
     val shuffleManager1 = makeSortShuffleManager(Some(conf))
     val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1)
     shuffleManager1.shuffleBlockResolver._blockManager = bm1
@@ -2089,15 +2092,30 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("test migration of shuffle blocks during decommissioning - no limit") {
-    testShuffleBlockDecommissioning(None, true)
+    testShuffleBlockDecommissioning(None, true, false)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - no limit - " 
+
+      "io.encryption enabled") {
+    testShuffleBlockDecommissioning(None, true, true)
   }
 
   test("test migration of shuffle blocks during decommissioning - larger 
limit") {
-    testShuffleBlockDecommissioning(Some(10000), true)
+    testShuffleBlockDecommissioning(Some(10000), true, false)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - larger limit 
- " +
+      "io.encryption enabled") {
+    testShuffleBlockDecommissioning(Some(10000), true, true)
   }
 
   test("[SPARK-34363]test migration of shuffle blocks during decommissioning - 
small limit") {
-    testShuffleBlockDecommissioning(Some(1), false)
+    testShuffleBlockDecommissioning(Some(1), false, false)
+  }
+
+  test("[SPARK-34363]test migration of shuffle blocks during decommissioning - 
small limit -" +
+      " io.encryption enabled") {
+    testShuffleBlockDecommissioning(Some(1), false, true)
   }
 
   test("SPARK-32919: Shuffle push merger locations should be bounded with in" +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to