[ 
https://issues.apache.org/jira/browse/SPARK-23427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367742#comment-16367742
 ] 

Pratik Dhumal edited comment on SPARK-23427 at 2/16/18 7:18 PM:
----------------------------------------------------------------

{code:java}
// code placeholder
@Test
def testLoop() = {
  val schema = new StructType().add("test", types.IntegerType)
  var t1 = spark.createDataFrame(spark.sparkContext.parallelize(1 to 
1000000).map(i => Row(i)), schema)
  val t2 = spark.createDataFrame(spark.sparkContext.parallelize(4 to 
1400).map(i => Row(i)), schema)
  val t3 = spark.createDataFrame(spark.sparkContext.parallelize(15 to 
190).map(i => Row(i)), schema)
  val t4 = spark.createDataFrame(spark.sparkContext.parallelize(135 to 
652).map(i => Row(i)), schema)
  val t5 = spark.createDataFrame(spark.sparkContext.parallelize(86 to 
352).map(i => Row(i)), schema)

  t1.persist().count()
  t2.persist().count()
  t3.persist().count()
  t4.persist().count()
  t5.persist().count()
  var dfResult: DataFrame = null
  while (true) {
    var t3Filter = t3.filter("test % 2 = 1")
    var t4Filter = t4.filter("test % 2 = 0")
    t1.createOrReplaceTempView("T1")
    t2.createOrReplaceTempView("T2")
    t3Filter.createOrReplaceTempView("T3")
    t4Filter.createOrReplaceTempView("T4")
    t5.createOrReplaceTempView("T5")

    var query =
      """ SELECT T1.* FROM T1
        | INNER JOIN T2 ON T1.test=t2.test
        | LEFT JOIN T3 ON T1.test=t3.test
        | LEFT JOIN T4 ON T1.test=t4.test
        | LEFT JOIN T5 ON T1.test=t5.test

        | """.stripMargin
    if (t1 == null) {
      t1 = spark.sql(query)
      t1.persist().count()


    } else {
      var tmp1 = spark.sql(query)
      var tmp2 = t1
      t1 = tmp1.union(tmp2)
      t1.persist().count()
      tmp2.unpersist(true)
      tmp2 = null
    }


    println("t1 : " + (SizeEstimator.estimate(t1) / 1024 / 1024))
    // Do Something - Currently doing nothing

    spark.catalog.dropTempView("T1")
    spark.catalog.dropTempView("T2")
    spark.catalog.dropTempView("T3")
    spark.catalog.dropTempView("T4")
    spark.catalog.dropTempView("T5")



  }

  t3.unpersist(true)
  t2.unpersist(true)
  t1.unpersist(true)
  t4.unpersist(true)
  t5.unpersist(true)

  println("VOID")
}


// RESULT LOG

t1 : 8

t1 : 208

t1 : 310

t1 : 187

t1 : 441

t1 : 440

t1 : 547

t1 : 651

t1 : 759

t1 : 733

t1 : 1129{code}
 

 

Hope this helps. 


was (Author: dpratik):
{code:java}
// code placeholder
@Test
def testLoop() = {
  val schema = new StructType().add("test", types.IntegerType)
  var t1 = spark.createDataFrame(spark.sparkContext.parallelize(1 to 
1000000).map(i => Row(i)), schema)
  val t2 = spark.createDataFrame(spark.sparkContext.parallelize(4 to 
1400).map(i => Row(i)), schema)
  val t3 = spark.createDataFrame(spark.sparkContext.parallelize(15 to 
190).map(i => Row(i)), schema)
  val t4 = spark.createDataFrame(spark.sparkContext.parallelize(135 to 
652).map(i => Row(i)), schema)
  val t5 = spark.createDataFrame(spark.sparkContext.parallelize(86 to 
352).map(i => Row(i)), schema)

  t1.persist().count()
  t2.persist().count()
  t3.persist().count()
  t4.persist().count()
  t5.persist().count()
  var dfResult: DataFrame = null
  while (true) {
    var t3Filter = t3.filter("test % 2 = 1")
    var t4Filter = t4.filter("test % 2 = 0")
    t1.createOrReplaceTempView("T1")
    t2.createOrReplaceTempView("T2")
    t3Filter.createOrReplaceTempView("T3")
    t4Filter.createOrReplaceTempView("T4")
    t5.createOrReplaceTempView("T5")

    var query =
      """ SELECT T1.* FROM T1
        | INNER JOIN T2 ON T1.test=t2.test
        | LEFT JOIN T3 ON T1.test=t3.test
        | LEFT JOIN T4 ON T1.test=t4.test
        | LEFT JOIN T5 ON T1.test=t5.test

        | """.stripMargin
    if (t1 == null) {
      t1 = spark.sql(query)
      t1.persist().count()


    } else {
      var tmp1 = spark.sql(query)
      var tmp2 = t1
      t1 = tmp1.union(tmp2)
      t1.persist().count()
      tmp2.unpersist(true)
      tmp2 = null
    }


    println("t1 : " + (SizeEstimator.estimate(t1) / 1024 / 1024))
    // Do Something - Currently doing nothing

    spark.catalog.dropTempView("T1")
    spark.catalog.dropTempView("T2")
    spark.catalog.dropTempView("T3")
    spark.catalog.dropTempView("T4")
    spark.catalog.dropTempView("T5")



  }

  t3.unpersist(true)
  t2.unpersist(true)
  t1.unpersist(true)
  t4.unpersist(true)
  t5.unpersist(true)

  println("VOID")
}
{code}
Hope this helps. 

> spark.sql.autoBroadcastJoinThreshold causing OOM exception in the driver 
> -------------------------------------------------------------------------
>
>                 Key: SPARK-23427
>                 URL: https://issues.apache.org/jira/browse/SPARK-23427
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: SPARK 2.0 version
>            Reporter: Dhiraj
>            Priority: Critical
>
> We are facing issue around value of spark.sql.autoBroadcastJoinThreshold.
> With spark.sql.autoBroadcastJoinThreshold -1 ( disable) we seeing driver 
> memory used flat.
> With any other values 10MB, 5MB, 2 MB, 1MB, 10K, 1K we see driver memory used 
> goes up with rate depending upon the size of the autoBroadcastThreshold and 
> getting OOM exception. The problem is memory used by autoBroadcast is not 
> being free up in the driver.
> Application imports oracle tables as master dataframes which are persisted. 
> Each job applies filter to these tables and then registered them as 
> tempViewTable . Then sql query are using to process data further. At the end 
> all the intermediate dataFrame are unpersisted.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to