This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3d1317f [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams 3d1317f is described below commit 3d1317f8657beddfc6e8a5e49dbbbaaefdff1a5c Author: Kevin Sewell <kevins...@apple.com> AuthorDate: Thu Feb 24 08:14:07 2022 -0800 [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams ### What changes were proposed in this pull request? Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a NextIterator as opposed to a plain Iterator, this will allow us to close the DataInputStream properly. This happens in Spark driver only. ### Why are the changes needed? SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of `CompressionCodec.compressedInputStream` would need to manually close the stream as this would no longer be handled by the finaliser mechanism. In SparkPlan, the result of `CompressionCodec.compressedInputStream` is wrapped in an Iterator which never calls close. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? #### Spark Shell Configuration ```bash $> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g" $> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd ``` #### Test Script ```scala import java.sql.Timestamp import java.time.Instant import spark.implicits._ case class Record(timestamp: Timestamp, batch: Long, value: Long) (1 to 300).foreach { batch => sc.parallelize(1 to 1000000).map(Record(Timestamp.from(Instant.now()), batch, _)).toDS.write.parquet(s"test_data/batch_$batch") } (1 to 300).foreach(batch => spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect()) ``` #### Memory Monitor ```shell $> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done; ``` #### Results ##### Before ``` "2022-02-22 11:55:23",1400016 "2022-02-22 11:55:33",1522024 "2022-02-22 11:55:43",1587812 "2022-02-22 11:55:53",1631868 "2022-02-22 11:56:03",1657252 "2022-02-22 11:56:13",1659728 "2022-02-22 11:56:23",1664640 "2022-02-22 11:56:33",1674152 "2022-02-22 11:56:43",1697320 "2022-02-22 11:56:53",1689636 "2022-02-22 11:57:03",1783888 "2022-02-22 11:57:13",1896920 "2022-02-22 11:57:23",1950492 "2022-02-22 11:57:33",2010968 "2022-02-22 11:57:44",2066560 "2022-02-22 11:57:54",2108232 "2022-02-22 11:58:04",2158188 "2022-02-22 11:58:14",2211344 "2022-02-22 11:58:24",2260180 "2022-02-22 11:58:34",2316352 "2022-02-22 11:58:44",2367412 "2022-02-22 11:58:54",2420916 "2022-02-22 11:59:04",2472132 "2022-02-22 11:59:14",2519888 "2022-02-22 11:59:24",2571372 "2022-02-22 11:59:34",2621992 "2022-02-22 11:59:44",2672400 "2022-02-22 11:59:54",2728924 "2022-02-22 12:00:04",2777712 "2022-02-22 12:00:14",2834272 "2022-02-22 12:00:24",2881344 "2022-02-22 12:00:34",2935552 "2022-02-22 12:00:44",2984896 "2022-02-22 12:00:54",3034116 "2022-02-22 12:01:04",3087092 "2022-02-22 12:01:14",3134432 "2022-02-22 12:01:25",3198316 "2022-02-22 12:01:35",3193484 "2022-02-22 12:01:45",3193212 "2022-02-22 12:01:55",3192872 "2022-02-22 12:02:05",3191772 "2022-02-22 12:02:15",3187780 "2022-02-22 12:02:25",3177084 "2022-02-22 12:02:35",3173292 "2022-02-22 12:02:45",3173292 "2022-02-22 12:02:55",3173292 ``` ##### After ``` "2022-02-22 12:05:03",1377124 "2022-02-22 12:05:13",1425132 "2022-02-22 12:05:23",1564060 "2022-02-22 12:05:33",1616116 "2022-02-22 12:05:43",1637448 "2022-02-22 12:05:53",1637700 "2022-02-22 12:06:03",1653912 "2022-02-22 12:06:13",1659532 "2022-02-22 12:06:23",1673368 "2022-02-22 12:06:33",1687580 "2022-02-22 12:06:43",1711076 "2022-02-22 12:06:53",1849752 "2022-02-22 12:07:03",1861528 "2022-02-22 12:07:13",1871200 "2022-02-22 12:07:24",1878860 "2022-02-22 12:07:34",1879332 "2022-02-22 12:07:44",1886552 "2022-02-22 12:07:54",1884160 "2022-02-22 12:08:04",1880924 "2022-02-22 12:08:14",1876084 "2022-02-22 12:08:24",1878800 "2022-02-22 12:08:34",1879068 "2022-02-22 12:08:44",1880088 "2022-02-22 12:08:54",1880160 "2022-02-22 12:09:04",1880496 "2022-02-22 12:09:14",1891672 "2022-02-22 12:09:24",1878552 "2022-02-22 12:09:34",1876136 "2022-02-22 12:09:44",1890056 "2022-02-22 12:09:54",1878076 "2022-02-22 12:10:04",1882440 "2022-02-22 12:10:14",1893172 "2022-02-22 12:10:24",1894216 "2022-02-22 12:10:34",1894204 "2022-02-22 12:10:44",1894716 "2022-02-22 12:10:54",1894720 "2022-02-22 12:11:04",1894720 "2022-02-22 12:11:15",1895232 "2022-02-22 12:11:25",1895496 "2022-02-22 12:11:35",1895496 ``` Closes #35613 from kevins-29/spark-38273. Lead-authored-by: Kevin Sewell <kevins...@apple.com> Co-authored-by: kevins-29 <100220899+kevins...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 43c89dca89d1a4c0dc63354f46b5bd4b39cdda65) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/sql/execution/SparkPlan.scala | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2fbfe4d..10cda2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator object SparkPlan { /** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. */ @@ -370,10 +371,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val bis = new ByteArrayInputStream(bytes) val ins = new DataInputStream(codec.compressedInputStream(bis)) - new Iterator[InternalRow] { + new NextIterator[InternalRow] { private var sizeOfNextRow = ins.readInt() - override def hasNext: Boolean = sizeOfNextRow >= 0 - override def next(): InternalRow = { + private def _next(): InternalRow = { val bs = new Array[Byte](sizeOfNextRow) ins.readFully(bs) val row = new UnsafeRow(nFields) @@ -381,6 +381,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ sizeOfNextRow = ins.readInt() row } + + override def getNext(): InternalRow = { + if (sizeOfNextRow >= 0) { + try { + _next() + } catch { + case t: Throwable if ins != null => + ins.close() + throw t + } + } else { + finished = true + null + } + } + override def close(): Unit = ins.close() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org