I'm running spark locally on my laptop to explore how persistence impacts
memory use. I'm generating 80 MB matrices in numpy and then simply adding
them as an example problem.


 No matter what I set NUM or persistence level to in the code below, I get
out of memory errors like (
https://gist.github.com/ericmjonas/fe062a118232e14d2076 ). I've tried
varying my serializer size, my worker size, etc. such as the following:

SPARK_DRIVER_MEMORY=4g src/spark-1.1.0-bin-cdh4/bin/spark-submit  --conf
spark.kryoserializer.buffer.mb=512 --conf spark.akka.frameSize=1000  --conf
spark.executor.memory=4g --conf spark.python.worker.memory=4g --master
local[2] persistlevels.py

If I don't try and persist, the job of course completes as spark is smart
enough to sequentially create-reduce-create-reduce and do the right thing.

Is there something I'm missing here? why should persistence make the sim
crash? Especially with a high number of partitions ?

Thanks for any and all help,

...Eric


import sys
from random import random
from operator import add
import numpy as np
import time

import pyspark
from pyspark import SparkContext , StorageLevel

"""
Test what the persistence levels mean for spark

Exploit the fact that when running locally we can write to
temp files

"""

ROOT_LOG_NAME = "/tmp/create_data.%d.%d"

def create_data(SIZE_N, index):
    fid = open(ROOT_LOG_NAME % (SIZE_N, index), 'w')
    fid.write("%03d creating\n" % index)
    fid.close()
    np.random.seed(index)
    d = np.random.normal(0, 1, SIZE_N)
    return d


if __name__ == "__main__":
    """
        Usage: pi [slices]
    """
    sc = SparkContext(appName="PersistTest", batchSize=1)

    def sum_two_mat(a, b):
        print "Summing matrix"
        return a + b

    def sum_two_mat_p1(a, b):
        print "Summing matrix + 1"
        return a + b + 1

    SIZE_N = 10000000 # generate 80MB matrix

    NUM = 100
    a = sc.parallelize(xrange(NUM), 10).map(lambda x: create_data(SIZE_N,
x)).persist(StorageLevel.MEMORY_AND_DISK)


    # two different reductions to test if we double-generate
    b = a.reduce(sum_two_mat)
    c = a.reduce(sum_two_mat)
    print np.sum(b + c)

    sc.stop()

Reply via email to