I've tried adding task.py to pyFiles during SparkContext creation and it
worked perfectly. Thanks for your help!

If you need some more information for further investigation, here's what
I've noticed. Without explicitly adding file to SparkContext, only
functions that are defined in main module run by PySpark can be passed to
distributed jobs. E.g. if I define myfunc() in runner.py (and run
runner.py), it works pretty well. But if I define myfunc() in task.py (and
still run runner.py), it fails as I've described above. I've posted stderr
from failed executor here <http://pastebin.com/NHNW3sTY>, but essentially
it just says that Python worker crashed without any reference to the cause.
For the sake of completeness, here's also console
output<http://pastebin.com/Lkvdfhhz>
.

To make it clear: all these errors occur only in my initial setup, adding
"task.py" to SparkContext fixes it anyway. Hope this helps.

Thanks,
Andrei





On Sat, Nov 16, 2013 at 2:12 PM, Andrei <faithlessfri...@gmail.com> wrote:

> Hi,
>
> thanks for your replies. I'm out of office now, so I will check it out on
> Monday morning, but guess about serialization/deserialization looks
> plausible.
>
> Thanks,
> Andrei
>
>
> On Sat, Nov 16, 2013 at 11:11 AM, Jey Kottalam <j...@cs.berkeley.edu>wrote:
>
>> Hi Andrei,
>>
>> Could you please post the stderr logfile from the failed executor? You
>> can find this in the "work" subdirectory of the worker that had the failed
>> task. You'll need the executor id to find the corresonding stderr file.
>>
>> Thanks,
>> -Jey
>>
>>
>> On Friday, November 15, 2013, Andrei wrote:
>>
>>> I have 2 Python modules/scripts - task.py and runner.py. First one
>>> (task.py) is a little Spark job and works perfectly well by itself.
>>> However, when called from runner.py with exactly the same arguments, it
>>> fails with only useless message (both - in terminal and worker logs).
>>>
>>>     org.apache.spark.SparkException: Python worker exited unexpectedly
>>> (crashed)
>>>
>>> Below there's code for both - task.py and runner.py:
>>>
>>> task.py
>>> -----------
>>>
>>> #!/usr/bin/env pyspark
>>> from __future__ import print_function
>>> from pyspark import SparkContext
>>>
>>> def process(line):
>>>     return line.strip()
>>>
>>> def main(spark_master, path):
>>>     sc = SparkContext(spark_master, 'My Job')
>>>     rdd = sc.textFile(path)
>>>     rdd = rdd.map(process)     # this line causes troubles when called
>>> from runner.py
>>>     count = rdd.count()
>>>     print(count)
>>>
>>> if __name__ == '__main__':
>>>     main('spark://spark-master-host:7077',
>>>             'hdfs://hdfs-namenode-host:8020/path/to/file.log')
>>>
>>>
>>> runner.py
>>> -------------
>>>
>>> #!/usr/bin/env pyspark
>>>
>>> import task
>>>
>>> if __name__ == '__main__':
>>>     task.main('spark://spark-master-host:7077',
>>>                    'hdfs://hdfs-namenode-host:8020/path/to/file.log')
>>>
>>>
>>> -------------------------------------------------------------------------------------------
>>>
>>> So, what's the difference between calling PySpark-enabled script
>>> directly and as Python module? What are good rules for writing multi-module
>>> Python programs with Spark?
>>>
>>> Thanks,
>>> Andrei
>>>
>>
>

Reply via email to