[ https://issues.apache.org/jira/browse/SPARK-22209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212027#comment-16212027 ]
Joel Croteau commented on SPARK-22209: -------------------------------------- Yes [~bryanc], I know I can do that. It's just annoying that a long workflow failed after running for quite a while because of that. At the very least, it should be documented. > PySpark does not recognize imports from submodules > -------------------------------------------------- > > Key: SPARK-22209 > URL: https://issues.apache.org/jira/browse/SPARK-22209 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.2.0, 2.3.0 > Environment: Anaconda 4.4.0, Python 3.6, Hadoop 2.7, CDH 5.3.3, JDK > 1.8, Centos 6 > Reporter: Joel Croteau > Priority: Minor > > Using submodule syntax inside a PySpark job seems to create issues. For > example, the following: > {code:python} > import scipy.sparse > from pyspark import SparkContext, SparkConf > def do_stuff(x): > y = scipy.sparse.dok_matrix((1, 1)) > y[0, 0] = x > return y[0, 0] > def init_context(): > conf = SparkConf().setAppName("Spark Test") > sc = SparkContext(conf=conf) > return sc > def main(): > sc = init_context() > data = sc.parallelize([1, 2, 3, 4]) > output_data = data.map(do_stuff) > print(output_data.collect()) > __name__ == '__main__' and main() > {code} > produces this error: > {noformat} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.collect(RDD.scala:935) > at > org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458) > at > org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/home/matt/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", > line 177, in main > process() > File > "/home/matt/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", > line 172, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File > "/home/matt/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", > line 268, in dump_stream > vs = list(itertools.islice(iterator, batch)) > File "/home/jcroteau/is/pel_selection/test_sparse.py", line 6, in dostuff > y = scipy.sparse.dok_matrix((1, 1)) > AttributeError: module 'scipy' has no attribute 'sparse' > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) > at > org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) > at > org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > But if this is changed to: > {code:python} > import scipy.sparse as sp > from pyspark import SparkContext, SparkConf > def do_stuff(x): > y = sp.dok_matrix((1, 1)) > y[0, 0] = x > return y[0, 0] > def init_context(): > conf = SparkConf().setAppName("Spark Test") > sc = SparkContext(conf=conf) > return sc > def main(): > sc = init_context() > data = sc.parallelize([1, 2, 3, 4]) > output_data = data.map(do_stuff) > print(output_data.collect()) > __name__ == '__main__' and main() > {code} > It works fine. At the very least, this should be documented. I've looked > through the documentation, but I haven't found a mention of this anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org