Hi all,

I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
using pyspark.

There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
executors's memory in SparkSQL, on which we would do some calculation using
UDFs in pyspark.
If I run my SQL on only a portion of the data (filtering by one of the
attributes), let's say 800 million records, then all works well. But when I
run the same SQL on all the data, then I receive "*java.lang.OutOfMemoryError:
GC overhead limit exceeded"* from basically all of the executors.

It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
causing this "GC overhead limit being exceeded".

Details:

- using Spark 2.0.0 on a Hadoop YARN cluster

- 300 executors, each with 2 CPU cores and 8Gb memory (
spark.yarn.executor.memoryOverhead=6400 )

- a table of 5.6 Billions rows loaded into the memory of the executors
(taking up 450Gb of memory), partitioned evenly across the executors

- creating even the simplest UDF in SparkSQL causes 'GC overhead limit
exceeded' error if running on all records. Running the same on a smaller
dataset (~800 million rows) does succeed. If no UDF, the query succeed on
the whole dataset.

- simplified pyspark code:

*from pyspark.sql.types import StringType*

*def test_udf(var):*
*    """test udf that will always return a"""*
*    return "a"*
*sqlContext.registerFunction("test_udf", test_udf, StringType())*

*sqlContext.sql("""CACHE TABLE ma""")*

*results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,*
*                test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,*
* ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
STANDARD_ACCOUNT_CITY_SRC) *
*      / *
* CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
(STANDARD_ACCOUNT_CITY_SRC)*
*         THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)*
*         ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)*
*    END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,*
* STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV*
* FROM ma""")*

*results_df.registerTempTable("m")*
*sqlContext.cacheTable("m")*

*results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")*
*print(results_df.take(1))*


- the error thrown on the executors:

*16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
writer for /hadoop/cloudera/parcels/Anaconda/bin/python*
*java.lang.OutOfMemoryError: GC overhead limit exceeded*
* at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)*
* at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)*
* at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)*
* at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)*
* at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)*
* at
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)*
* at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)*
* at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)*
* at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)*
* at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)*
* at scala.collection.Iterator$class.foreach(Iterator.scala:893)*
* at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)*
* at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)*
* at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)*
* at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)*
* at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)*
*16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL TERM*


Has anybody experienced these "*GC overhead limit exceeded*" errors with
pyspark UDFs before?

Thanks,
Zoltan

Reply via email to