[ https://issues.apache.org/jira/browse/SPARK-44253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Attila Zsolt Piros updated SPARK-44253: --------------------------------------- Description: If the user registers a temporary view from a dataframe created by Structured Streaming and tries to drop the temporary view via his original SparkSession then memory will be leaking. The reason is Structured streaming has its own SparkSession (as a clone of the original Spark Session, for details see https://issues.apache.org/jira/browse/SPARK-26586 and https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194) the created temporary view belongs the cloned SparkSession and the dropping of the temporary view must be done via the cloned SparkSession. Example for the *memory leak*: {noformat} streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => val view = s“tempView_$batchId” batchDF.createOrReplaceTempView(view) ... spark.catalog.dropTempView(view) } {noformat} *Workaround* (the _dropTempView_ must be called on SparkSession accessed from dataframe created by streaming): {noformat} streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => val view = s“tempView_$batchId” batchDF.createOrReplaceTempView(view) ... batchDF.sparkSession.catalog.dropTempView(view) } {noformat} Example heapdump: !1.png! was: If the user registers a temporary view from a dataframe created by Structured Streaming and tries to drop the temporary view via his original SparkSession then memory will be leaking. The reason is Structured streaming has its own SparkSession (as a clone of the original Spark Session, for details see https://issues.apache.org/jira/browse/SPARK-26586 and https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194) the created temporary view belongs the cloned SparkSession and the dropping of the temporary view must be done via the cloned SparkSession. Example for the *memory leak*: {noformat} streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => val view = s“tempView_$batchId” batchDF.createOrReplaceTempView(view) ... spark.catalog.dropTempView(view) } {noformat} *Workaround* (the _dropTempView_ must be called on SparkSession accessed from dataframe created by streaming): {noformat} streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => val view = s“tempView_$batchId” batchDF.createOrReplaceTempView(view) ... batchDF.sparkSession.catalog.dropTempView(view) } {noformat} > Potential memory leak when temp views created from DF created by structured > streaming > ------------------------------------------------------------------------------------- > > Key: SPARK-44253 > URL: https://issues.apache.org/jira/browse/SPARK-44253 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.4, 3.3.2, 3.4.0, 3.4.1 > Reporter: Attila Zsolt Piros > Priority: Major > Attachments: 1.png, 2.png > > > If the user registers a temporary view from a dataframe created by Structured > Streaming and tries to drop the temporary view via his original SparkSession > then memory will be leaking. > The reason is Structured streaming has its own SparkSession (as a clone of > the original Spark Session, for details see > https://issues.apache.org/jira/browse/SPARK-26586 and > https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194) > the created temporary view belongs the cloned SparkSession and the dropping > of the temporary view must be done via the cloned SparkSession. > Example for the *memory leak*: > {noformat} > streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => > val view = s“tempView_$batchId” > batchDF.createOrReplaceTempView(view) > ... > spark.catalog.dropTempView(view) > } > {noformat} > *Workaround* (the _dropTempView_ must be called on SparkSession accessed from > dataframe created by streaming): > {noformat} > streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => > val view = s“tempView_$batchId” > batchDF.createOrReplaceTempView(view) > ... > batchDF.sparkSession.catalog.dropTempView(view) > } > {noformat} > Example heapdump: > !1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org