Hi,

I am using Spark 1.5.1.

I have a Spark SQL UDAF that works fine on a tiny dataset (13 narrow rows)
in local mode, but runs out of memory on YARN about half the time
(OutOfMemory: Java Heap Space). The rest of the time, it works on YARN.

Note that in all instances, the input data is the same.

Here is the UDAF: https://gist.github.com/alexnastetsky/581af2672328c4b8b023

I am also using a trivial UDT to keep track of each unique value and its
count.

The basic idea is to have a secondary grouping and to count the number
occurrences of each value in the group. For example, we want to group on
column X; then for each group, we want to aggregate the rows by column Y
and count how many times each unique value of Y appears.

So, given the following data:

X Y
a 1
a 2
a 2
a 2
b 3
b 3
b 4

I would do

myudaf = new MergeArraysOfElementWithCountUDAF()
df = // load data
df.groupBy($"X")
.agg(
myudaf($"Y").as("aggY")
)

should provide data like the following

X aggY
a [{"element":"1", "count":"1"}, {"element":"2", "count":"3"}]
b [{"element":"3", "count":"2"}, {"element":"4", "count":"1"}]

There's also an option to take as input an array, instead of a scalar, in
which case it just loops through the array and performs the same operation.

I've added some logging to show the Runtime.getRuntime.freeMemory right
before it throws the OOM error, and it shows plenty of memory (16 GB, when
I was running on a large node) still available. So I'm not sure if it's
some huge memory spike, or it's not actually seeing that available memory.

When the OOM does happen, it consistently happens at this line:
https://gist.github.com/alexnastetsky/581af2672328c4b8b023#file-mergearraysofelementwithcountudaf-scala-L59

java.lang.OutOfMemoryError: Java heap space
        at
scala.reflect.ManifestFactory$$anon$1.newArray(Manifest.scala:165)
        at
scala.reflect.ManifestFactory$$anon$1.newArray(Manifest.scala:164)
        at org.apache.spark.sql.types.ArrayData.toArray(ArrayData.scala:108)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toScala(CatalystTypeConverters.scala:235)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toScala(CatalystTypeConverters.scala:193)
        at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:414)
        at
org.apache.spark.sql.execution.aggregate.InputAggregationBuffer.get(udaf.scala:298)
        at org.apache.spark.sql.Row$class.getAs(Row.scala:316)
        at
org.apache.spark.sql.execution.aggregate.InputAggregationBuffer.getAs(udaf.scala:269)
        at
com.verve.scorecard.spark.sql.MergeArraysOfElementWithCountUDAF.merge(MergeArraysOfElementWithCountUDAF.scala:59)

Again, the data is tiny and it doesn't explain why it only happens some of
the time on YARN, and never when running in local mode.

Here's how I am running the app:

spark-submit \
--deploy-mode cluster \
--master yarn \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 18g \
--driver-java-options "-XX:MaxPermSize=256m" \
--conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=256m" \
--conf spark.storage.memoryFraction=0.2 \
--conf spark.shuffle.memoryFraction=0.6 \
--conf spark.sql.shuffle.partitions=1000 \
[...app specific stuff...]

Note that I am using a single executor and single core to help with
debugging, but I have the same issue with more executors/nodes.

I am running this on EMR on AWS, so this is unlikely to be a hardware issue
(different hardware each time I launch a cluster).

I've also isolated the issue to this UDAF, as removing it from my Spark SQL
makes the issue go away.

Any ideas would be appreciated.

Thanks,
Alex.

Reply via email to