This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3d1317f  [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should 
close underlying input streams
3d1317f is described below

commit 3d1317f8657beddfc6e8a5e49dbbbaaefdff1a5c
Author: Kevin Sewell <kevins...@apple.com>
AuthorDate: Thu Feb 24 08:14:07 2022 -0800

    [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying 
input streams
    
    ### What changes were proposed in this pull request?
    Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with 
a NextIterator as opposed to a plain Iterator, this will allow us to close the 
DataInputStream properly. This happens in Spark driver only.
    
    ### Why are the changes needed?
    SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. 
This meant that all usages of `CompressionCodec.compressedInputStream` would 
need to manually close the stream as this would no longer be handled by the 
finaliser mechanism.
    
    In SparkPlan, the result of `CompressionCodec.compressedInputStream` is 
wrapped in an Iterator which never calls close.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    #### Spark Shell Configuration
    ```bash
    $> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g"
    $> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd
    ```
    
    #### Test Script
    ```scala
    import java.sql.Timestamp
    import java.time.Instant
    import spark.implicits._
    
    case class Record(timestamp: Timestamp, batch: Long, value: Long)
    
    (1 to 300).foreach { batch =>
      sc.parallelize(1 to 1000000).map(Record(Timestamp.from(Instant.now()), 
batch, _)).toDS.write.parquet(s"test_data/batch_$batch")
    }
    
    (1 to 300).foreach(batch => 
spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect())
    
    ```
    
    #### Memory Monitor
    ```shell
    $> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | 
grep "total kB" | awk '{print $4}'); sleep 10; done;
    ```
    
    #### Results
    
    ##### Before
    ```
    "2022-02-22 11:55:23",1400016
    "2022-02-22 11:55:33",1522024
    "2022-02-22 11:55:43",1587812
    "2022-02-22 11:55:53",1631868
    "2022-02-22 11:56:03",1657252
    "2022-02-22 11:56:13",1659728
    "2022-02-22 11:56:23",1664640
    "2022-02-22 11:56:33",1674152
    "2022-02-22 11:56:43",1697320
    "2022-02-22 11:56:53",1689636
    "2022-02-22 11:57:03",1783888
    "2022-02-22 11:57:13",1896920
    "2022-02-22 11:57:23",1950492
    "2022-02-22 11:57:33",2010968
    "2022-02-22 11:57:44",2066560
    "2022-02-22 11:57:54",2108232
    "2022-02-22 11:58:04",2158188
    "2022-02-22 11:58:14",2211344
    "2022-02-22 11:58:24",2260180
    "2022-02-22 11:58:34",2316352
    "2022-02-22 11:58:44",2367412
    "2022-02-22 11:58:54",2420916
    "2022-02-22 11:59:04",2472132
    "2022-02-22 11:59:14",2519888
    "2022-02-22 11:59:24",2571372
    "2022-02-22 11:59:34",2621992
    "2022-02-22 11:59:44",2672400
    "2022-02-22 11:59:54",2728924
    "2022-02-22 12:00:04",2777712
    "2022-02-22 12:00:14",2834272
    "2022-02-22 12:00:24",2881344
    "2022-02-22 12:00:34",2935552
    "2022-02-22 12:00:44",2984896
    "2022-02-22 12:00:54",3034116
    "2022-02-22 12:01:04",3087092
    "2022-02-22 12:01:14",3134432
    "2022-02-22 12:01:25",3198316
    "2022-02-22 12:01:35",3193484
    "2022-02-22 12:01:45",3193212
    "2022-02-22 12:01:55",3192872
    "2022-02-22 12:02:05",3191772
    "2022-02-22 12:02:15",3187780
    "2022-02-22 12:02:25",3177084
    "2022-02-22 12:02:35",3173292
    "2022-02-22 12:02:45",3173292
    "2022-02-22 12:02:55",3173292
    ```
    
    ##### After
    ```
    "2022-02-22 12:05:03",1377124
    "2022-02-22 12:05:13",1425132
    "2022-02-22 12:05:23",1564060
    "2022-02-22 12:05:33",1616116
    "2022-02-22 12:05:43",1637448
    "2022-02-22 12:05:53",1637700
    "2022-02-22 12:06:03",1653912
    "2022-02-22 12:06:13",1659532
    "2022-02-22 12:06:23",1673368
    "2022-02-22 12:06:33",1687580
    "2022-02-22 12:06:43",1711076
    "2022-02-22 12:06:53",1849752
    "2022-02-22 12:07:03",1861528
    "2022-02-22 12:07:13",1871200
    "2022-02-22 12:07:24",1878860
    "2022-02-22 12:07:34",1879332
    "2022-02-22 12:07:44",1886552
    "2022-02-22 12:07:54",1884160
    "2022-02-22 12:08:04",1880924
    "2022-02-22 12:08:14",1876084
    "2022-02-22 12:08:24",1878800
    "2022-02-22 12:08:34",1879068
    "2022-02-22 12:08:44",1880088
    "2022-02-22 12:08:54",1880160
    "2022-02-22 12:09:04",1880496
    "2022-02-22 12:09:14",1891672
    "2022-02-22 12:09:24",1878552
    "2022-02-22 12:09:34",1876136
    "2022-02-22 12:09:44",1890056
    "2022-02-22 12:09:54",1878076
    "2022-02-22 12:10:04",1882440
    "2022-02-22 12:10:14",1893172
    "2022-02-22 12:10:24",1894216
    "2022-02-22 12:10:34",1894204
    "2022-02-22 12:10:44",1894716
    "2022-02-22 12:10:54",1894720
    "2022-02-22 12:11:04",1894720
    "2022-02-22 12:11:15",1895232
    "2022-02-22 12:11:25",1895496
    "2022-02-22 12:11:35",1895496
    ```
    
    Closes #35613 from kevins-29/spark-38273.
    
    Lead-authored-by: Kevin Sewell <kevins...@apple.com>
    Co-authored-by: kevins-29 <100220899+kevins...@users.noreply.github.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 43c89dca89d1a4c0dc63354f46b5bd4b39cdda65)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/sql/execution/SparkPlan.scala | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 2fbfe4d..10cda2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
 
 object SparkPlan {
   /** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. 
*/
@@ -370,10 +371,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
+      private def _next(): InternalRow = {
         val bs = new Array[Byte](sizeOfNextRow)
         ins.readFully(bs)
         val row = new UnsafeRow(nFields)
@@ -381,6 +381,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
         sizeOfNextRow = ins.readInt()
         row
       }
+
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            _next()
+          } catch {
+            case t: Throwable if ins != null =>
+              ins.close()
+              throw t
+          }
+        } else {
+          finished = true
+          null
+        }
+      }
+      override def close(): Unit = ins.close()
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to