It looks like for whatever reason your cluster isn't using the python you
distributed, or said distribution doesn't contain what you think.

I've used the following with success to deploy a conda environment to my
cluster at runtime:
https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/

On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Are you doubly sure if it is an issue in Spark? I used custom python
> several times with setting it in PYSPARK_PYTHON before and it was no
> problem.
>
> 2018년 9월 6일 (목) 오후 2:21, mithril <twinmeg...@gmail.com>님이 작성:
>
>> For better looking , please see
>> https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python
>> <https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python>
>>
>> ----------------------
>>
>>
>> I am using zeppelin connect remote spark cluster.
>>
>> remote spark is using system python 2.7 .
>>
>> I want to switch to miniconda3, install a lib pyarrow.
>> What I do is :
>>
>> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark
>> master and slaves.
>> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
>> `spark-env.sh` in spark master and slaves.
>> 3. restart spark and zeppelin
>> 4. Running code
>>
>>     %spark.pyspark
>>
>>         import pandas as pd
>>         from pyspark.sql.functions import pandas_udf,PandasUDFType
>>
>>
>>         @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
>>         def process_order_items(pdf):
>>
>>             pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
>>
>>             d = {'has_discount':'count',
>>                 'clearance':'count',
>>                 'count': ['count', 'sum'],
>>                 'price_guide':'max',
>>                 'total_price': 'sum'
>>
>>             }
>>
>>             pdf1 = pdf.groupby('day').agg(d)
>>             pdf1.columns = pdf1.columns.map('_'.join)
>>             d1 = {'has_discount_count':'discount_order_count',
>>                 'clearance_count':'clearance_order_count',
>>                 'count_count':'order_count',
>>                 'count_sum':'sale_count',
>>                 'price_guide_max':'price_guide',
>>                 'total_price_sum': 'total_price'
>>             }
>>
>>             pdf2 = pdf1.rename(columns=d1)
>>
>>             pdf2.loc[:, 'discount_sale_count'] =
>> pdf.loc[pdf.has_discount>0,
>> 'count'].resample(freq).sum()
>>             pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
>> 'count'].resample(freq).sum()
>>             pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
>>
>>             pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
>>
>>             return pdf2
>>
>>
>>         results = df.groupby("store_id",
>> "product_id").apply(process_order_items)
>>
>>         results.select(['store_id', 'price']).show(5)
>>
>>
>> Got error :
>>
>>     Py4JJavaError: An error occurred while calling o172.showString.
>>     : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 6.0 (TID 143, 10.104.33.18, executor 2):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 230, in main
>>         process()
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 225, in process
>>         serializer.dump_stream(func(split_index, iterator), outfile)
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 150, in <lambda>
>>         func = lambda _, it: map(mapper, it)
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/
>> serializers.py",
>> line 276, in load_stream
>>         import pyarrow as pa
>>     ImportError: No module named pyarrow
>>
>>
>> `10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not
>> set
>> correctly .
>>
>> `pyspark`
>>
>> I login to master and slaves, run `pyspark interpreter` in each, and found
>> `import pyarrow` do not throw exception .
>>
>>
>> PS: `pyarrow` also installed in the machine which running zeppelin.
>>
>> --------------
>>
>> More info:
>>
>>
>> 1. spark cluster is installed in A, B, C , zeppelin is installed in D.
>> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
>> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B
>> ,C /
>> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
>> 5. `import pyarrow` is fine on D's default python(miniconda3, path is
>> different with A, B ,C , but it is doesn't matter)
>>
>>
>>
>> So I completely coundn't understand why it doesn't work.
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

Reply via email to