[ 
https://issues.apache.org/jira/browse/SPARK-44253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Zsolt Piros updated SPARK-44253:
---------------------------------------
    Description: 
If the user registers a temporary view from a dataframe created by Structured 
Streaming and tries to drop the temporary view via his original SparkSession 
then memory will be leaking.

The reason is Structured streaming has its own SparkSession (as a clone of the 
original Spark Session, for details see 
https://issues.apache.org/jira/browse/SPARK-26586 and 
https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194)
 the created temporary view belongs the cloned SparkSession and the dropping of 
the temporary view must be done via the cloned SparkSession.

Example for the *memory leak*:

{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  spark.catalog.dropTempView(view)
}
{noformat}

*Workaround* (the _dropTempView_ must be called on SparkSession accessed from 
dataframe created by streaming):
{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  batchDF.sparkSession.catalog.dropTempView(view)
 }
{noformat}

Example heapdump:
 !1.png! 







  was:
If the user registers a temporary view from a dataframe created by Structured 
Streaming and tries to drop the temporary view via his original SparkSession 
then memory will be leaking.

The reason is Structured streaming has its own SparkSession (as a clone of the 
original Spark Session, for details see 
https://issues.apache.org/jira/browse/SPARK-26586 and 
https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194)
 the created temporary view belongs the cloned SparkSession and the dropping of 
the temporary view must be done via the cloned SparkSession.

Example for the *memory leak*:

{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  spark.catalog.dropTempView(view)
}
{noformat}

*Workaround* (the _dropTempView_ must be called on SparkSession accessed from 
dataframe created by streaming):
{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  batchDF.sparkSession.catalog.dropTempView(view)
 }
{noformat}










> Potential memory leak when temp views created from DF created by structured 
> streaming
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44253
>                 URL: https://issues.apache.org/jira/browse/SPARK-44253
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.4, 3.3.2, 3.4.0, 3.4.1
>            Reporter: Attila Zsolt Piros
>            Priority: Major
>         Attachments: 1.png, 2.png
>
>
> If the user registers a temporary view from a dataframe created by Structured 
> Streaming and tries to drop the temporary view via his original SparkSession 
> then memory will be leaking.
> The reason is Structured streaming has its own SparkSession (as a clone of 
> the original Spark Session, for details see 
> https://issues.apache.org/jira/browse/SPARK-26586 and 
> https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194)
>  the created temporary view belongs the cloned SparkSession and the dropping 
> of the temporary view must be done via the cloned SparkSession.
> Example for the *memory leak*:
> {noformat}
> streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
>   val view = s“tempView_$batchId” 
>   batchDF.createOrReplaceTempView(view)
>   ...
>   spark.catalog.dropTempView(view)
> }
> {noformat}
> *Workaround* (the _dropTempView_ must be called on SparkSession accessed from 
> dataframe created by streaming):
> {noformat}
> streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
>   val view = s“tempView_$batchId” 
>   batchDF.createOrReplaceTempView(view)
>   ...
>   batchDF.sparkSession.catalog.dropTempView(view)
>  }
> {noformat}
> Example heapdump:
>  !1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to