This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new b51e860cbfd [SPARK-43378][CORE] Properly close stream objects in 
deserializeFromChunkedBuffer
b51e860cbfd is described below

commit b51e860cbfdc03c0b085dc6e7dcb11fd1579113b
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Thu May 4 19:34:14 2023 -0500

    [SPARK-43378][CORE] Properly close stream objects in 
deserializeFromChunkedBuffer
    
    ### What changes were proposed in this pull request?
    
    Fixes a that SerializerHelper.deserializeFromChunkedBuffer does not call 
close on the deserialization stream. For some serializers like Kryo this 
creates a performance regressions as the kryo instances are not returned to the 
pool.
    
    ### Why are the changes needed?
    This causes a performance regression in Spark 3.4.0 for some workloads.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #41049 from eejbyfeldt/SPARK-43378.
    
    Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit cb26ad88c522070c66e979ab1ab0f040cd1bdbe7)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../src/main/scala/org/apache/spark/serializer/SerializerHelper.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala
index 2cff87990a4..54a0b2e339e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala
@@ -49,6 +49,8 @@ private[spark] object SerializerHelper extends Logging {
       serializerInstance: SerializerInstance,
       bytes: ChunkedByteBuffer): T = {
     val in = serializerInstance.deserializeStream(bytes.toInputStream())
-    in.readObject()
+    val res = in.readObject()
+    in.close()
+    res
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to