I am seeing a small standalone cluster (master, slave) hang when I reach a 
certain memory threshold, but I cannot detect how to configure memory to avoid 
this.
I added memory by configuring SPARK_DAEMON_MEMORY=2G and I can see this 
allocated, but it does not help.

The reduce is by key to get the counts by key:
        rdd = sc.parallelize(self.phrases)

        # do a distributed count using reduceByKey
        counts = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

        # reverse the (key, count) pairs into (count, key) and then sort in 
descending order
        sorted = counts.map(lambda (key, count): (count, key)).sortByKey(False)

Below is the log to the point of hanging:

14/04/06 19:39:15 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 
(PairwiseRDD[2] at reduceByKey)
14/04/06 19:39:15 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
14/04/06 19:39:15 INFO SparkDeploySchedulerBackend: Registered executor: 
Actor[akka.tcp://sparkExecutor@localhost:64370/user/Executor#-2031138316] with 
ID 0
14/04/06 19:39:15 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor 
0: localhost (PROCESS_LOCAL)
14/04/06 19:39:15 INFO TaskSetManager: Serialized task 1.0:0 as 10417848 bytes 
in 18 ms
14/04/06 19:39:15 INFO TaskSetManager: Starting task 1.0:1 as TID 1 on executor 
0: localhost (PROCESS_LOCAL)
14/04/06 19:39:15 INFO TaskSetManager: Serialized task 1.0:1 as 10571697 bytes 
in 13 ms
14/04/06 19:39:15 INFO BlockManagerMasterActor$BlockManagerInfo: Registering 
block manager localhost:64375 with 294.9 MB RAM
14/04/06 19:39:16 INFO TaskSetManager: Finished TID 0 in 1397 ms on localhost 
(progress: 0/2)
14/04/06 19:39:16 INFO DAGScheduler: Completed ShuffleMapTask(1, 0)


When I interrupt the running program, here is the stack trace, which appears 
stuck after the reduce in the sorting by count in descending order:

    sorted = counts.map(lambda (key, count): (count, key)).sortByKey(False)
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", 
line 361, in sortByKey
    rddSize = self.count()
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", 
line 542, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", 
line 533, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", 
line 499, in reduce
    vals = self.mapPartitions(func).collect()
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", 
line 463, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
 line 535, in __call__
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
 line 363, in send_command
  File 
"/Users/zakons/Spark/spark-0.9.0-incubating-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
 line 472, in send_command
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py",
 line 430, in readline
    data = recv(1)
KeyboardInterrupt

Is there a reason why the sorting gets stuck?  I can easily remove the problem 
by reducing the size of the RDD below the threshold of about 800,000 items 
prior to the reduce is run.
It would help to see where resources like memory are depleted, but this does 
not show up in the console.

Many thanks,
Stuart Zakon

Reply via email to