[ 
https://issues.apache.org/jira/browse/SPARK-44253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Zsolt Piros updated SPARK-44253:
---------------------------------------
    Description: 
If the user registers a temporary view from a dataframe created by Structured 
Streaming and tries to drop the temporary view via his original SparkSession 
then memory will be leaking.

The reason is Structured streaming has its own SparkSession (as a clone of the 
original SparkSession, for details see 
https://issues.apache.org/jira/browse/SPARK-26586 and 
[https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194])
 the created temporary view belongs the cloned SparkSession and the dropping of 
the temporary view must be done via the cloned SparkSession.

Example for the {*}memory leak{*}:
{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  spark.catalog.dropTempView(view)
}
{noformat}
*Workaround* (the _dropTempView_ must be called on SparkSession accessed from 
dataframe created by streaming):
{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  batchDF.sparkSession.catalog.dropTempView(view)
 }
{noformat}
h4. Example heap dump

The SparkSession with the leak:

!1.png|width=807,height=120!

The two SparkSession instances where the first one was is the original 
SparkSession created by the user and the second is the clone:
!2.png|width=813,height=157!

  was:
If the user registers a temporary view from a dataframe created by Structured 
Streaming and tries to drop the temporary view via his original SparkSession 
then memory will be leaking.

The reason is Structured streaming has its own SparkSession (as a clone of the 
original Spark Session, for details see 
https://issues.apache.org/jira/browse/SPARK-26586 and 
[https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194])
 the created temporary view belongs the cloned SparkSession and the dropping of 
the temporary view must be done via the cloned SparkSession.

Example for the {*}memory leak{*}:
{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  spark.catalog.dropTempView(view)
}
{noformat}
*Workaround* (the _dropTempView_ must be called on SparkSession accessed from 
dataframe created by streaming):
{noformat}
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  val view = s“tempView_$batchId” 
  batchDF.createOrReplaceTempView(view)
  ...
  batchDF.sparkSession.catalog.dropTempView(view)
 }
{noformat}
h4. Example heap dump

The SparkSession with the leak:

!1.png|width=807,height=120!

The two SparkSession instances where the first one was is the original 
SparkSession created by the user and the second is the clone:
!2.png|width=813,height=157!


> Potential memory leak when temp views created from DF created by structured 
> streaming
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44253
>                 URL: https://issues.apache.org/jira/browse/SPARK-44253
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.4, 3.3.2, 3.4.0, 3.4.1
>            Reporter: Attila Zsolt Piros
>            Priority: Major
>         Attachments: 1.png, 2.png
>
>
> If the user registers a temporary view from a dataframe created by Structured 
> Streaming and tries to drop the temporary view via his original SparkSession 
> then memory will be leaking.
> The reason is Structured streaming has its own SparkSession (as a clone of 
> the original SparkSession, for details see 
> https://issues.apache.org/jira/browse/SPARK-26586 and 
> [https://github.com/apache/spark/blob/branch-3.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L193-L194])
>  the created temporary view belongs the cloned SparkSession and the dropping 
> of the temporary view must be done via the cloned SparkSession.
> Example for the {*}memory leak{*}:
> {noformat}
> streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
>   val view = s“tempView_$batchId” 
>   batchDF.createOrReplaceTempView(view)
>   ...
>   spark.catalog.dropTempView(view)
> }
> {noformat}
> *Workaround* (the _dropTempView_ must be called on SparkSession accessed from 
> dataframe created by streaming):
> {noformat}
> streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
>   val view = s“tempView_$batchId” 
>   batchDF.createOrReplaceTempView(view)
>   ...
>   batchDF.sparkSession.catalog.dropTempView(view)
>  }
> {noformat}
> h4. Example heap dump
> The SparkSession with the leak:
> !1.png|width=807,height=120!
> The two SparkSession instances where the first one was is the original 
> SparkSession created by the user and the second is the clone:
> !2.png|width=813,height=157!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to