Hi all,

I am getting some strange behavior with the RDD take function in PySpark
while doing some interactive coding in an IPython notebook.  I am running
PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.

I am using sc.wholeTextFiles and pandas to load a collection of .csv files
into an RDD of pandas dataframes. I create an RDD called train_rdd for
which each row of the RDD contains a label and pandas dataframe pair:

import pandas as pd
from StringIO import StringIO

rdd = sc.wholeTextFiles(data_path, 1000)
train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1]))))

In order to test out the next steps I want to take, I am trying to use take to
select one of the dataframes and apply the desired modifications before
writing out the Spark code to apply it to all of the dataframes in parallel.

However, when I try to use take like this:

label, df = train_rdd.take(1)[0]

I get a spark.driver.maxResultSize error (stack trace included at the end
of this message). Now, each of these dataframes is only about 100MB in
size, so should easily fit on the driver and not go over the maxResultSize
limit of 1024MB.

If I instead use takeSample, though, there is no problem:

label, df = train_rdd.takeSample(False, 1, seed=50)[0]

(Here, I have set the seed so that the RDD that is selected is the same one
that the take function is trying to load (i.e., the first one), just to
ensure that it is not because the specific dataframe take is getting is too
large.)

Does calling take result in a collect operation being performed before
outputting the first item? That would explain to me why this error is
occurring, but that seems like poor behavior for the take function. Clearly
takeSample is behaving the way I want it to, but it would be nice if I
could get this behavior with the take function, or at least without needing
to choose an element randomly. I was able to get the behavior I wanted
above by just changing the seed until I got the dataframe I wanted, but I
don't think that is a good approach in general.

Any insight is appreciated.

Best,
David Montague




---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-38-7eca647cba46> in <module>()
      1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0]
----> 2 label, df = train_rdd.take(1)[0]

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py
in take(self, num)
   1109
   1110             p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1111             res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1112
   1113             items += res

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py
in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    816         # SparkContext#runJob.
    817         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 818         it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions, allowLocal)
    819         return list(mappedRDD._collect_iterator_through_file(it))
    820

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539
    540         for temp_arg in temp_args:

/var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total
size of serialized results of 177 tasks (1038.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to