> Does this mean you only have 1.6G memory for executor (others left for
Python) ?
> The cached table could take 1.5G, it means almost nothing left for other
things.
True. I have also tried with memoryOverhead being set to 800 (10% of the
8Gb memory), but no difference. The "GC overhead limit exceeded" is still
the same.

> Python UDF do requires some buffering in JVM, the size of buffering
depends on how much rows are under processing by Python process.
I did some more testing in the meantime.
Leaving the UDFs as-is, but removing some other, static columns from the
above SELECT FROM command has stopped the memoryOverhead error
from occurring. I have plenty enough memory to store the results with all
static columns, plus when the UDFs are not there only the rest of the
static columns are, then it runs fine. This makes me believe that having
UDFs and many columns causes the issue together. Maybe when you have UDFs
then somehow the memory usage depends on the amount of data in that record
(the whole row), which includes other fields too, which are actually not
used by the UDF. Maybe the UDF serialization to Python serializes the whole
row instead of just the attributes of the UDF?

On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com> wrote:

> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
> wrote:
> > Hi all,
> >
> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
> 2.0.0
> > using pyspark.
> >
> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
> > executors's memory in SparkSQL, on which we would do some calculation
> using
> > UDFs in pyspark.
> > If I run my SQL on only a portion of the data (filtering by one of the
> > attributes), let's say 800 million records, then all works well. But
> when I
> > run the same SQL on all the data, then I receive
> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically
> all
> > of the executors.
> >
> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> > causing this "GC overhead limit being exceeded".
> >
> > Details:
> >
> > - using Spark 2.0.0 on a Hadoop YARN cluster
> >
> > - 300 executors, each with 2 CPU cores and 8Gb memory (
> > spark.yarn.executor.memoryOverhead=6400 )
>
> Does this mean you only have 1.6G memory for executor (others left for
> Python) ?
> The cached table could take 1.5G, it means almost nothing left for other
> things.
>
> Python UDF do requires some buffering in JVM, the size of buffering
> depends on
> how much rows are under processing by Python process.
>
> > - a table of 5.6 Billions rows loaded into the memory of the executors
> > (taking up 450Gb of memory), partitioned evenly across the executors
> >
> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> > exceeded' error if running on all records. Running the same on a smaller
> > dataset (~800 million rows) does succeed. If no UDF, the query succeed on
> > the whole dataset.
> >
> > - simplified pyspark code:
> >
> > from pyspark.sql.types import StringType
> >
> > def test_udf(var):
> >     """test udf that will always return a"""
> >     return "a"
> > sqlContext.registerFunction("test_udf", test_udf, StringType())
> >
> > sqlContext.sql("""CACHE TABLE ma""")
> >
> > results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
> >                 test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> > ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> > STANDARD_ACCOUNT_CITY_SRC)
> >      /
> > CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> > (STANDARD_ACCOUNT_CITY_SRC)
> >         THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> >         ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> >    END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> > STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> > FROM ma""")
> >
> > results_df.registerTempTable("m")
> > sqlContext.cacheTable("m")
> >
> > results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> > print(results_df.take(1))
> >
> >
> > - the error thrown on the executors:
> >
> > 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
> > writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> > org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(
> UnsafeRow.java:503)
> > at
> > org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(
> UnsafeRow.java:61)
> > at
> > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$
> 1.apply(BatchEvalPythonExec.scala:64)
> > at
> > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$
> 1.apply(BatchEvalPythonExec.scala:64)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at
> > scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.
> scala:1076)
> > at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
> > at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
> > at scala.collection.Iterator$GroupedIterator.hasNext(
> Iterator.scala:1132)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > at
> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(
> PythonRDD.scala:504)
> > at
> > org.apache.spark.api.python.PythonRunner$WriterThread$$
> anonfun$run$3.apply(PythonRDD.scala:328)
> > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
> > at
> > org.apache.spark.api.python.PythonRunner$WriterThread.run(
> PythonRDD.scala:269)
> > 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> > SIGNAL TERM
> >
> >
> > Has anybody experienced these "GC overhead limit exceeded" errors with
> > pyspark UDFs before?
> >
> > Thanks,
> > Zoltan
> >
>

Reply via email to