Sebastián Ramírez created SPARK-4748: ----------------------------------------
Summary: PySpark can't read data in HDFS in YARN mode Key: SPARK-4748 URL: https://issues.apache.org/jira/browse/SPARK-4748 Project: Spark Issue Type: Bug Components: PySpark, YARN Affects Versions: 1.1.1 Environment: Spark 1.1.1 precompiled for Hadoop 2.4 Hortonworks HDP 2.1 CentOS 6.6 (Anaconda 2.1.0 64-bit) Python 2.7.8 Numpy 1.9.0 Reporter: Sebastián Ramírez Using *PySpark*, I'm being unable to read and process data in *HDFS* in *YARN* cluster mode. But I can read data from HDFS in local mode. I have a 6 nodes cluster with Hortonworks HDP 2.1. The operating system is CentOS 6.6. I have installed Anaconda Python (which includes numpy) on every node for the user yarn. ---- h5. This works (*PySpark* local reading from HDFS): When I start the console with: {code} IPYTHON=1 /home/hdfs/spark-1.1.1-bin-hadoop2.4/bin/pyspark --master local {code} Then I do (that file is in HDFS): {code} testdata = sc.textFile('/user/hdfs/testdata.csv') {code} And then: {code} testdata.first() {code} I get my data back: {code} u'asdf,qwer,1,M' {code} And if I do: {code} testdata.count() {code} It also works, I get: {code} 500 {code} ---- h5. This also works (*Scala* in YARN cluster reading from HDFS): When I start the console with: {code} /home/hdfs/spark-1.1.1-bin-hadoop2.4/bin/spark-shell --master yarn-client --num-executors 6 --executor-cores 2 --executor-memory 2G --driver-memory 2G {code} Then I do (that file is in HDFS): {code} val testdata = sc.textFile("/user/hdfs/testdata.csv") {code} And then: {code} testdata.first() {code} I get my data back: {code} res1: String = asdf,qwer,1,M {code} And if I do: {code} testdata.count() {code} It also works, I get: {code} res2: Long = 500 {code} ---- h5. This doesn't work (*PySpark* in YARN cluster reading from HDFS): When I start the console with: {code} IPYTHON=1 /home/hdfs/spark-1.1.1-bin-hadoop2.4/bin/pyspark --master yarn-client --num-executors 6 --executor-cores 2 --executor-memory 2G --driver-memory 2G {code} Then I do (that file is in HDFS): {code} testdata = sc.textFile('/user/hdfs/testdata.csv') {code} And then: {code} testdata.first() {code} And I get some *INFO* logs, and then a *WARN*: {code} 14/12/04 15:26:40 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, node05): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 127, in dump_stream for obj in iterator: File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 185, in _batched for item in iterator: File "/home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.py", line 1146, in takeUpToNumLeft ImportError: No module named next org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) 14/12/04 15:26:40 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, node05, NODE_LOCAL, 1254 bytes) 14/12/04 15:26:40 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on executor node05: org.apache.spark.api.python.PythonException (Traceback (most recent call last): File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 127, in dump_stream for obj in iterator: File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 185, in _batched for item in iterator: File "/home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.py", line 1146, in takeUpToNumLeft ImportError: No module named next ) [duplicate 1] {code} I get some other *WARN* like that one and some *INFO*, and then an *ERROR*: {code} 14/12/04 15:26:45 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 14/12/04 15:26:45 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool {code} And then some *INFO* , and finally a *Py4JJavaError*: {code} --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-2-39fd6123a6cd> in <module>() ----> 1 testdata.first() /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self) 1164 2 1165 """ -> 1166 return self.take(1)[0] 1167 1168 def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.pyc in take(self, num) 1150 p = range( 1151 partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1152 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1153 1154 items += res /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 768 # SparkContext#runJob. 769 mappedRDD = rdd.mapPartitions(partitionFunc) --> 770 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 771 return list(mappedRDD._collect_iterator_through_file(it)) 772 /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, node05): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 127, in dump_stream for obj in iterator: File "/hadoop/yarn/local/usercache/hdfs/filecache/44/spark-assembly-1.1.1-hadoop2.4.0.jar/pyspark/serializers.py", line 185, in _batched for item in iterator: File "/home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.py", line 1146, in takeUpToNumLeft ImportError: No module named next org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} And if I do: {code} testdata.count() {code} I get a *WARN*: {code} 14/12/04 15:34:09 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 5, node05): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:150) org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) {code} Then some *INFO* and finally an *ERROR*: {code} 14/12/04 15:34:15 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job 14/12/04 15:34:15 INFO cluster.YarnClientClusterScheduler: Cancelling stage 1 14/12/04 15:34:15 INFO cluster.YarnClientClusterScheduler: Stage 1 was cancelled 14/12/04 15:34:15 INFO scheduler.DAGScheduler: Failed to run count at <ipython-input-3-74bd1c2768a3>:1 --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-3-74bd1c2768a3> in <module>() ----> 1 testdata.count() /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.pyc in count(self) 844 3 845 """ --> 846 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 847 848 def stats(self): /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.pyc in sum(self) 835 6.0 836 """ --> 837 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 838 839 def count(self): /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.pyc in reduce(self, f) 756 if acc is not None: 757 yield acc --> 758 vals = self.mapPartitions(func).collect() 759 return reduce(f, vals) 760 /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/pyspark/rdd.pyc in collect(self) 720 """ 721 with _JavaStackTrace(self.context) as st: --> 722 bytesInJava = self._jrdd.collect().iterator() 723 return list(self._collect_iterator_through_file(bytesInJava)) 724 /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hdfs/spark-1.1.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o40.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 9, node04): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:150) org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} ---- Maybe I'm doing something wrong, I would appreciate any feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org