I've been working on this problem for several days (I am doing more to
increase my knowledge of Spark). The code you linked to hangs because
after reading in the file, I have to gunzip it.
Another way that seems to be working is reading each file in using
sc.textFile, and then writing it the HDFS, and then using wholeTextFiles
for the HDFS result.
But the bigger issue is that both methods are not executed in parallel.
When I open my yarn manager, it shows that only one node is being used.
Henry
On 02/06/2017 03:39 PM, Jon Gregg wrote:
Strange that it's working for some directories but not others. Looks
like wholeTextFiles maybe doesn't work with S3?
https://issues.apache.org/jira/browse/SPARK-4414 .
If it's possible to load the data into EMR and run Spark from there
that may be a workaround. This blogspot shows a python workaround
that might work as well:
http://michaelryanbell.com/processing-whole-files-spark-s3.html
Jon
On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <paulhtremb...@gmail.com
<mailto:paulhtremb...@gmail.com>> wrote:
I've actually been able to trace the problem to the files being
read in. If I change to a different directory, then I don't get
the error. Is one of the executors running out of memory?
On 02/06/2017 02:35 PM, Paul Tremblay wrote:
When I try to create an rdd using wholeTextFiles, I get an
incomprehensible error. But when I use the same path with
sc.textFile, I get no error.
I am using pyspark with spark 2.1.
in_path =
's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/
rdd = sc.wholeTextFiles(in_path)
rdd.take(1)
/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self,
takeUpToNumLeft, p)
1344
1345 items += res
/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc
<http://jsc.sc>(), mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
967
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client,
self.target_id, self.name <http://self.name>)
1134
1135 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling
{0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 failed 4 times, most recent
failure: Lost task 0.3 in stage 1.0 (TID 7,
ip-172-31-45-114.us-west-2.com
<http://ip-172-31-45-114.us-west-2.com>pute.internal, executor
8): ExecutorLostFailure (executor 8 exited caused by one of
the running tasks) Reason: Container marked as failed:
container_1486415078210_0005_01_000016 on host:
ip-172-31-45-114.us-west-2.com
<http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486415078210_0005_01_000016
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
rdd = sc.textFile(in_path)
In [8]: rdd.take(1)
Out[8]: [u'WARC/1.0']
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>