Hi all, I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0 using pyspark.
There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300 executors's memory in SparkSQL, on which we would do some calculation using UDFs in pyspark. If I run my SQL on only a portion of the data (filtering by one of the attributes), let's say 800 million records, then all works well. But when I run the same SQL on all the data, then I receive "*java.lang.OutOfMemoryError: GC overhead limit exceeded"* from basically all of the executors. It seems to me that pyspark UDFs in SparkSQL might have a memory leak, causing this "GC overhead limit being exceeded". Details: - using Spark 2.0.0 on a Hadoop YARN cluster - 300 executors, each with 2 CPU cores and 8Gb memory ( spark.yarn.executor.memoryOverhead=6400 ) - a table of 5.6 Billions rows loaded into the memory of the executors (taking up 450Gb of memory), partitioned evenly across the executors - creating even the simplest UDF in SparkSQL causes 'GC overhead limit exceeded' error if running on all records. Running the same on a smaller dataset (~800 million rows) does succeed. If no UDF, the query succeed on the whole dataset. - simplified pyspark code: *from pyspark.sql.types import StringType* *def test_udf(var):* * """test udf that will always return a"""* * return "a"* *sqlContext.registerFunction("test_udf", test_udf, StringType())* *sqlContext.sql("""CACHE TABLE ma""")* *results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,* * test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,* * ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC, STANDARD_ACCOUNT_CITY_SRC) * * / * * CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH (STANDARD_ACCOUNT_CITY_SRC)* * THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)* * ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)* * END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,* * STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV* * FROM ma""")* *results_df.registerTempTable("m")* *sqlContext.cacheTable("m")* *results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")* *print(results_df.take(1))* - the error thrown on the executors: *16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout writer for /hadoop/cloudera/parcels/Anaconda/bin/python* *java.lang.OutOfMemoryError: GC overhead limit exceeded* * at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)* * at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)* * at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)* * at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)* * at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)* * at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)* * at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)* * at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)* * at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)* * at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)* * at scala.collection.Iterator$class.foreach(Iterator.scala:893)* * at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)* * at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)* * at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)* * at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)* * at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)* *16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM* Has anybody experienced these "*GC overhead limit exceeded*" errors with pyspark UDFs before? Thanks, Zoltan