It looks like for whatever reason your cluster isn't using the python you distributed, or said distribution doesn't contain what you think.
I've used the following with success to deploy a conda environment to my cluster at runtime: https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/ On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote: > Are you doubly sure if it is an issue in Spark? I used custom python > several times with setting it in PYSPARK_PYTHON before and it was no > problem. > > 2018년 9월 6일 (목) 오후 2:21, mithril <twinmeg...@gmail.com>님이 작성: > >> For better looking , please see >> https://stackoverflow.com/questions/52178406/howto-make- >> pyspark-use-custom-python >> <https://stackoverflow.com/questions/52178406/howto-make- >> pyspark-use-custom-python> >> >> ---------------------- >> >> >> I am using zeppelin connect remote spark cluster. >> >> remote spark is using system python 2.7 . >> >> I want to switch to miniconda3, install a lib pyarrow. >> What I do is : >> >> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark >> master and slaves. >> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to >> `spark-env.sh` in spark master and slaves. >> 3. restart spark and zeppelin >> 4. Running code >> >> %spark.pyspark >> >> import pandas as pd >> from pyspark.sql.functions import pandas_udf,PandasUDFType >> >> >> @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) >> def process_order_items(pdf): >> >> pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count'] >> >> d = {'has_discount':'count', >> 'clearance':'count', >> 'count': ['count', 'sum'], >> 'price_guide':'max', >> 'total_price': 'sum' >> >> } >> >> pdf1 = pdf.groupby('day').agg(d) >> pdf1.columns = pdf1.columns.map('_'.join) >> d1 = {'has_discount_count':'discount_order_count', >> 'clearance_count':'clearance_order_count', >> 'count_count':'order_count', >> 'count_sum':'sale_count', >> 'price_guide_max':'price_guide', >> 'total_price_sum': 'total_price' >> } >> >> pdf2 = pdf1.rename(columns=d1) >> >> pdf2.loc[:, 'discount_sale_count'] = >> pdf.loc[pdf.has_discount>0, >> 'count'].resample(freq).sum() >> pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0, >> 'count'].resample(freq).sum() >> pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count >> >> pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index) >> >> return pdf2 >> >> >> results = df.groupby("store_id", >> "product_id").apply(process_order_items) >> >> results.select(['store_id', 'price']).show(5) >> >> >> Got error : >> >> Py4JJavaError: An error occurred while calling o172.showString. >> : org.apache.spark.SparkException: Job aborted due to stage failure: >> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in >> stage 6.0 (TID 143, 10.104.33.18, executor 2): >> org.apache.spark.api.python.PythonException: Traceback (most recent call >> last): >> File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", >> line >> 230, in main >> process() >> File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", >> line >> 225, in process >> serializer.dump_stream(func(split_index, iterator), outfile) >> File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", >> line >> 150, in <lambda> >> func = lambda _, it: map(mapper, it) >> File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ >> serializers.py", >> line 276, in load_stream >> import pyarrow as pa >> ImportError: No module named pyarrow >> >> >> `10.104.33.18` is spark master, so I think the `PYSPARK_PYTHON` is not >> set >> correctly . >> >> `pyspark` >> >> I login to master and slaves, run `pyspark interpreter` in each, and found >> `import pyarrow` do not throw exception . >> >> >> PS: `pyarrow` also installed in the machine which running zeppelin. >> >> -------------- >> >> More info: >> >> >> 1. spark cluster is installed in A, B, C , zeppelin is installed in D. >> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C >> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B >> ,C / >> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3) >> 5. `import pyarrow` is fine on D's default python(miniconda3, path is >> different with A, B ,C , but it is doesn't matter) >> >> >> >> So I completely coundn't understand why it doesn't work. >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>