Hi,

I am trying to evaluate performance aspects of Spark in respect to various memory settings. What makes it more difficult is that I'm new to Python, but the problem at hand doesn't seem to originate from that.

I'm running a wordcount script [1] with different amounts of input data. There is always an OutOfMemoryError at the end of the reduce tasks [2] when I'm using a 1g input while 100m of data don't make a problem. Spark is v1.2.1 (but with v1.3 I'm having the same problem) and it runs on a VM with Ubuntu 14.04, 8G RAM and 4VCPU. (If something else is of interest, please ask)

On http://spark.apache.org/docs/1.2.1/tuning.html#memory-usage-of-reduce-tasks it's suggested to increase the parallelism which I've tried (even with over 4000 tasks) but nothing's changed. Other efforts with spark.executor.memory, spark.python.worker.memory and extraJavaOptions with -Xmx4g (see code below) didn't solve the problem either.

What do you suggest to get rid of the Java heap filling up completely?

Thanks
Balazs


[1] Wordcount script

import sys
import time
from operator import add
from pyspark import SparkContext, SparkConf
from signal import signal, SIGPIPE, SIG_DFL

def encode(text):
    """
    For printing unicode characters to the console.
    """
    return text.encode('utf-8')

if __name__ == "__main__":
    start_time = time.time()

    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: wordcount <file>"
        exit(-1)
    conf = (SparkConf()
             .setMaster("local")
             .setAppName("PythonWordCount")
             .set("spark.executor.memory", "6g")
             .set("spark.python.worker.memory","6g")
             .set("spark.default.parallelism",120)
             .set("spark.driver.extraJavaOptions","-Xmx4g"))
    sc = SparkContext(conf = conf)
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
# output would take too long and the important thing is the processing time
#    for (word, count) in output:
#        print encode("%s: %i" % (word, count))
#    print("%f seconds" % (time.time() - start_time))

    sc.stop()

    print("%f seconds" % (time.time() - start_time))

[2] OutOfMemoryError at reduce tasks
...
15/03/19 07:58:52 INFO ShuffleBlockFetcherIterator: Getting 30 non-empty blocks out of 30 blocks 15/03/19 07:58:52 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/03/19 07:58:52 INFO TaskSetManager: Finished task 99.0 in stage 1.0 (TID 129) in 1096 ms on localhost (100/120) 15/03/19 07:58:52 INFO PythonRDD: Times: total = 351, boot = -530, init = 534, finish = 347 15/03/19 07:58:52 ERROR Executor: Exception in task 100.0 in stage 1.0 (TID 130)
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
    at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
    at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:164)
at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply$mcV$sp(TaskResult.scala:48)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/03/19 07:58:52 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]
...

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to