[ https://issues.apache.org/jira/browse/SPARK-23427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367742#comment-16367742 ]
Pratik Dhumal edited comment on SPARK-23427 at 2/16/18 7:18 PM: ---------------------------------------------------------------- {code:java} // code placeholder @Test def testLoop() = { val schema = new StructType().add("test", types.IntegerType) var t1 = spark.createDataFrame(spark.sparkContext.parallelize(1 to 1000000).map(i => Row(i)), schema) val t2 = spark.createDataFrame(spark.sparkContext.parallelize(4 to 1400).map(i => Row(i)), schema) val t3 = spark.createDataFrame(spark.sparkContext.parallelize(15 to 190).map(i => Row(i)), schema) val t4 = spark.createDataFrame(spark.sparkContext.parallelize(135 to 652).map(i => Row(i)), schema) val t5 = spark.createDataFrame(spark.sparkContext.parallelize(86 to 352).map(i => Row(i)), schema) t1.persist().count() t2.persist().count() t3.persist().count() t4.persist().count() t5.persist().count() var dfResult: DataFrame = null while (true) { var t3Filter = t3.filter("test % 2 = 1") var t4Filter = t4.filter("test % 2 = 0") t1.createOrReplaceTempView("T1") t2.createOrReplaceTempView("T2") t3Filter.createOrReplaceTempView("T3") t4Filter.createOrReplaceTempView("T4") t5.createOrReplaceTempView("T5") var query = """ SELECT T1.* FROM T1 | INNER JOIN T2 ON T1.test=t2.test | LEFT JOIN T3 ON T1.test=t3.test | LEFT JOIN T4 ON T1.test=t4.test | LEFT JOIN T5 ON T1.test=t5.test | """.stripMargin if (t1 == null) { t1 = spark.sql(query) t1.persist().count() } else { var tmp1 = spark.sql(query) var tmp2 = t1 t1 = tmp1.union(tmp2) t1.persist().count() tmp2.unpersist(true) tmp2 = null } println("t1 : " + (SizeEstimator.estimate(t1) / 1024 / 1024)) // Do Something - Currently doing nothing spark.catalog.dropTempView("T1") spark.catalog.dropTempView("T2") spark.catalog.dropTempView("T3") spark.catalog.dropTempView("T4") spark.catalog.dropTempView("T5") } t3.unpersist(true) t2.unpersist(true) t1.unpersist(true) t4.unpersist(true) t5.unpersist(true) println("VOID") } // RESULT LOG t1 : 8 t1 : 208 t1 : 310 t1 : 187 t1 : 441 t1 : 440 t1 : 547 t1 : 651 t1 : 759 t1 : 733 t1 : 1129{code} Hope this helps. was (Author: dpratik): {code:java} // code placeholder @Test def testLoop() = { val schema = new StructType().add("test", types.IntegerType) var t1 = spark.createDataFrame(spark.sparkContext.parallelize(1 to 1000000).map(i => Row(i)), schema) val t2 = spark.createDataFrame(spark.sparkContext.parallelize(4 to 1400).map(i => Row(i)), schema) val t3 = spark.createDataFrame(spark.sparkContext.parallelize(15 to 190).map(i => Row(i)), schema) val t4 = spark.createDataFrame(spark.sparkContext.parallelize(135 to 652).map(i => Row(i)), schema) val t5 = spark.createDataFrame(spark.sparkContext.parallelize(86 to 352).map(i => Row(i)), schema) t1.persist().count() t2.persist().count() t3.persist().count() t4.persist().count() t5.persist().count() var dfResult: DataFrame = null while (true) { var t3Filter = t3.filter("test % 2 = 1") var t4Filter = t4.filter("test % 2 = 0") t1.createOrReplaceTempView("T1") t2.createOrReplaceTempView("T2") t3Filter.createOrReplaceTempView("T3") t4Filter.createOrReplaceTempView("T4") t5.createOrReplaceTempView("T5") var query = """ SELECT T1.* FROM T1 | INNER JOIN T2 ON T1.test=t2.test | LEFT JOIN T3 ON T1.test=t3.test | LEFT JOIN T4 ON T1.test=t4.test | LEFT JOIN T5 ON T1.test=t5.test | """.stripMargin if (t1 == null) { t1 = spark.sql(query) t1.persist().count() } else { var tmp1 = spark.sql(query) var tmp2 = t1 t1 = tmp1.union(tmp2) t1.persist().count() tmp2.unpersist(true) tmp2 = null } println("t1 : " + (SizeEstimator.estimate(t1) / 1024 / 1024)) // Do Something - Currently doing nothing spark.catalog.dropTempView("T1") spark.catalog.dropTempView("T2") spark.catalog.dropTempView("T3") spark.catalog.dropTempView("T4") spark.catalog.dropTempView("T5") } t3.unpersist(true) t2.unpersist(true) t1.unpersist(true) t4.unpersist(true) t5.unpersist(true) println("VOID") } {code} Hope this helps. > spark.sql.autoBroadcastJoinThreshold causing OOM exception in the driver > ------------------------------------------------------------------------- > > Key: SPARK-23427 > URL: https://issues.apache.org/jira/browse/SPARK-23427 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Environment: SPARK 2.0 version > Reporter: Dhiraj > Priority: Critical > > We are facing issue around value of spark.sql.autoBroadcastJoinThreshold. > With spark.sql.autoBroadcastJoinThreshold -1 ( disable) we seeing driver > memory used flat. > With any other values 10MB, 5MB, 2 MB, 1MB, 10K, 1K we see driver memory used > goes up with rate depending upon the size of the autoBroadcastThreshold and > getting OOM exception. The problem is memory used by autoBroadcast is not > being free up in the driver. > Application imports oracle tables as master dataframes which are persisted. > Each job applies filter to these tables and then registered them as > tempViewTable . Then sql query are using to process data further. At the end > all the intermediate dataFrame are unpersisted. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org