When you have a Python UDF, only the input to UDF are passed into
Python process,
but all other fields that are used together with the result of UDF are
kept in a queue
then join with the result from Python. The length of this queue is depend on the
number of rows is under processing by Python (or in the buffer of
Python process).
The amount of memory required also depend on how many fields are used in the
results.

On Tue, Aug 9, 2016 at 11:09 AM, Zoltan Fedor <zoltan.1.fe...@gmail.com> wrote:
>> Does this mean you only have 1.6G memory for executor (others left for
>> Python) ?
>> The cached table could take 1.5G, it means almost nothing left for other
>> things.
> True. I have also tried with memoryOverhead being set to 800 (10% of the 8Gb
> memory), but no difference. The "GC overhead limit exceeded" is still the
> same.
>
>> Python UDF do requires some buffering in JVM, the size of buffering
>> depends on how much rows are under processing by Python process.
> I did some more testing in the meantime.
> Leaving the UDFs as-is, but removing some other, static columns from the
> above SELECT FROM command has stopped the memoryOverhead error from
> occurring. I have plenty enough memory to store the results with all static
> columns, plus when the UDFs are not there only the rest of the static
> columns are, then it runs fine. This makes me believe that having UDFs and
> many columns causes the issue together. Maybe when you have UDFs then
> somehow the memory usage depends on the amount of data in that record (the
> whole row), which includes other fields too, which are actually not used by
> the UDF. Maybe the UDF serialization to Python serializes the whole row
> instead of just the attributes of the UDF?
>
> On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
>> > 2.0.0
>> > using pyspark.
>> >
>> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
>> > executors's memory in SparkSQL, on which we would do some calculation
>> > using
>> > UDFs in pyspark.
>> > If I run my SQL on only a portion of the data (filtering by one of the
>> > attributes), let's say 800 million records, then all works well. But
>> > when I
>> > run the same SQL on all the data, then I receive
>> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically
>> > all
>> > of the executors.
>> >
>> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
>> > causing this "GC overhead limit being exceeded".
>> >
>> > Details:
>> >
>> > - using Spark 2.0.0 on a Hadoop YARN cluster
>> >
>> > - 300 executors, each with 2 CPU cores and 8Gb memory (
>> > spark.yarn.executor.memoryOverhead=6400 )
>>
>> Does this mean you only have 1.6G memory for executor (others left for
>> Python) ?
>> The cached table could take 1.5G, it means almost nothing left for other
>> things.
>>
>> Python UDF do requires some buffering in JVM, the size of buffering
>> depends on
>> how much rows are under processing by Python process.
>>
>> > - a table of 5.6 Billions rows loaded into the memory of the executors
>> > (taking up 450Gb of memory), partitioned evenly across the executors
>> >
>> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
>> > exceeded' error if running on all records. Running the same on a smaller
>> > dataset (~800 million rows) does succeed. If no UDF, the query succeed
>> > on
>> > the whole dataset.
>> >
>> > - simplified pyspark code:
>> >
>> > from pyspark.sql.types import StringType
>> >
>> > def test_udf(var):
>> >     """test udf that will always return a"""
>> >     return "a"
>> > sqlContext.registerFunction("test_udf", test_udf, StringType())
>> >
>> > sqlContext.sql("""CACHE TABLE ma""")
>> >
>> > results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
>> >                 test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
>> > ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
>> > STANDARD_ACCOUNT_CITY_SRC)
>> >      /
>> > CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
>> > (STANDARD_ACCOUNT_CITY_SRC)
>> >         THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>> >         ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>> >    END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
>> > STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
>> > FROM ma""")
>> >
>> > results_df.registerTempTable("m")
>> > sqlContext.cacheTable("m")
>> >
>> > results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
>> > print(results_df.take(1))
>> >
>> >
>> > - the error thrown on the executors:
>> >
>> > 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
>> > writer for /hadoop/cloudera/parcels/Anaconda/bin/python
>> > java.lang.OutOfMemoryError: GC overhead limit exceeded
>> > at
>> >
>> > org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
>> > at
>> >
>> > org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
>> > at
>> >
>> > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
>> > at
>> >
>> > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> > at
>> >
>> > scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
>> > at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
>> > at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
>> > at
>> > scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
>> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
>> > at
>> >
>> > org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
>> > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
>> > at
>> >
>> > org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
>> > 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
>> > SIGNAL TERM
>> >
>> >
>> > Has anybody experienced these "GC overhead limit exceeded" errors with
>> > pyspark UDFs before?
>> >
>> > Thanks,
>> > Zoltan
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to