Hi all, I am getting some strange behavior with the RDD take function in PySpark while doing some interactive coding in an IPython notebook. I am running PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.
I am using sc.wholeTextFiles and pandas to load a collection of .csv files into an RDD of pandas dataframes. I create an RDD called train_rdd for which each row of the RDD contains a label and pandas dataframe pair: import pandas as pd from StringIO import StringIO rdd = sc.wholeTextFiles(data_path, 1000) train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1])))) In order to test out the next steps I want to take, I am trying to use take to select one of the dataframes and apply the desired modifications before writing out the Spark code to apply it to all of the dataframes in parallel. However, when I try to use take like this: label, df = train_rdd.take(1)[0] I get a spark.driver.maxResultSize error (stack trace included at the end of this message). Now, each of these dataframes is only about 100MB in size, so should easily fit on the driver and not go over the maxResultSize limit of 1024MB. If I instead use takeSample, though, there is no problem: label, df = train_rdd.takeSample(False, 1, seed=50)[0] (Here, I have set the seed so that the RDD that is selected is the same one that the take function is trying to load (i.e., the first one), just to ensure that it is not because the specific dataframe take is getting is too large.) Does calling take result in a collect operation being performed before outputting the first item? That would explain to me why this error is occurring, but that seems like poor behavior for the take function. Clearly takeSample is behaving the way I want it to, but it would be nice if I could get this behavior with the take function, or at least without needing to choose an element randomly. I was able to get the behavior I wanted above by just changing the seed until I got the dataframe I wanted, but I don't think that is a good approach in general. Any insight is appreciated. Best, David Montague --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-38-7eca647cba46> in <module>() 1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0] ----> 2 label, df = train_rdd.take(1)[0] /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py in take(self, num) 1109 1110 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1111 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1112 1113 items += res /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 816 # SparkContext#runJob. 817 mappedRDD = rdd.mapPartitions(partitionFunc) --> 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 819 return list(mappedRDD._collect_iterator_through_file(it)) 820 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 177 tasks (1038.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)