Strange that it's working for some directories but not others. Looks like wholeTextFiles maybe doesn't work with S3? https://issues.apache.org/jira/browse/SPARK-4414 .
If it's possible to load the data into EMR and run Spark from there that may be a workaround. This blogspot shows a python workaround that might work as well: http://michaelryanbell.com/processing-whole-files-spark-s3.html Jon On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <paulhtremb...@gmail.com> wrote: > I've actually been able to trace the problem to the files being read in. > If I change to a different directory, then I don't get the error. Is one of > the executors running out of memory? > > > > > > On 02/06/2017 02:35 PM, Paul Tremblay wrote: > >> When I try to create an rdd using wholeTextFiles, I get an >> incomprehensible error. But when I use the same path with sc.textFile, I >> get no error. >> >> I am using pyspark with spark 2.1. >> >> in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/148069 >> 8542939.6/warc/ >> >> rdd = sc.wholeTextFiles(in_path) >> >> rdd.take(1) >> >> >> /usr/lib/spark/python/pyspark/rdd.py in take(self, num) >> 1341 >> 1342 p = range(partsScanned, min(partsScanned + >> numPartsToTry, totalParts)) >> -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) >> 1344 >> 1345 items += res >> >> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, >> partitionFunc, partitions, allowLocal) >> 963 # SparkContext#runJob. >> 964 mappedRDD = rdd.mapPartitions(partitionFunc) >> --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), >> mappedRDD._jrdd, partitions) >> 966 return list(_load_from_socket(port, >> mappedRDD._jrdd_deserializer)) >> 967 >> >> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in >> __call__(self, *args) >> 1131 answer = self.gateway_client.send_command(command) >> 1132 return_value = get_return_value( >> -> 1133 answer, self.gateway_client, self.target_id, >> self.name) >> 1134 >> 1135 for temp_arg in temp_args: >> >> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) >> 61 def deco(*a, **kw): >> 62 try: >> ---> 63 return f(*a, **kw) >> 64 except py4j.protocol.Py4JJavaError as e: >> 65 s = e.java_exception.toString() >> >> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in >> get_return_value(answer, gateway_client, target_id, name) >> 317 raise Py4JJavaError( >> 318 "An error occurred while calling >> {0}{1}{2}.\n". >> --> 319 format(target_id, ".", name), value) >> 320 else: >> 321 raise Py4JError( >> >> Py4JJavaError: An error occurred while calling >> z:org.apache.spark.api.python.PythonRDD.runJob. >> : org.apache.spark.SparkException: Job aborted due to stage failure: >> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in >> stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor >> 8): ExecutorLostFailure (executor 8 exited caused by one of the running >> tasks) Reason: Container marked as failed: >> container_1486415078210_0005_01_000016 >> on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. >> Diagnostics: Exception from container-launch. >> Container id: container_1486415078210_0005_01_000016 >> Exit code: 52 >> Stack trace: ExitCodeException exitCode=52: >> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) >> at org.apache.hadoop.util.Shell.run(Shell.java:479) >> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Sh >> ell.java:773) >> at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerEx >> ecutor.launchContainer(DefaultContainerExecutor.java:212) >> at org.apache.hadoop.yarn.server.nodemanager.containermanager.l >> auncher.ContainerLaunch.call(ContainerLaunch.java:302) >> at org.apache.hadoop.yarn.server.nodemanager.containermanager.l >> auncher.ContainerLaunch.call(ContainerLaunch.java:82) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> rdd = sc.textFile(in_path) >> >> In [8]: rdd.take(1) >> Out[8]: [u'WARC/1.0'] >> >> > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >