Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Hi Marcelo,

I see what you mean. Tried it but still got same error message.

Error from python worker:
>   Traceback (most recent call last):
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in 
> _run_module_as_main
>   mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in 
> _get_module_details
>   __import__(pkg_name)
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", 
> line 46, in 
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", 
> line 29, in 
>   ModuleNotFoundError: No module named 'py4j'
> PYTHONPATH was:
>   
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk3/yarn/usercache/jianshi.huang/filecache/134/__spark_libs__8468485589501316413.zip/spark-core_2.11-2.3.2.jar
>
>
On Fri, Oct 5, 2018 at 1:25 AM Marcelo Vanzin  wrote:

> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
> expanded by the shell).
>
> But it's really weird to be setting SPARK_HOME in the environment of
> your node managers. YARN shouldn't need to know about that.
> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
> wrote:
> >
> >
> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
> >
> > The code shows Spark will try to find the path if SPARK_HOME is
> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
> the pre-installed 2.2.1 path.
> >
> > I don't want to make any changes to worker node configuration, so any
> way to override the order?
> >
> > Jianshi
> >
> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
> wrote:
> >>
> >> Normally the version of Spark installed on the cluster does not
> >> matter, since Spark is uploaded from your gateway machine to YARN by
> >> default.
> >>
> >> You probably have some configuration (in spark-defaults.conf) that
> >> tells YARN to use a cached copy. Get rid of that configuration, and
> >> you can use whatever version you like.
> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a problem using multiple versions of Pyspark on YARN, the
> driver and worker nodes are all preinstalled with Spark 2.2.1, for
> production tasks. And I want to use 2.3.2 for my personal EDA.
> >> >
> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >> >
> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >> >
> >> > Here's the error message,
> >> >>
> >> >>
> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> >> Error from python worker:
> >> >> Traceback (most recent call last):
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> >> __import__(pkg_name)
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> >> ModuleNotFoundError: No module named 'py4j'
> >> >> PYTHONPATH was:
> >> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >> >
> >> >
> >> > And here's how I started Pyspark session in Jupyter.
> >> >>
> >> >>
> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> >> import findspark
> >> >> findspark.init()
> >> >> import pyspark
> >> >> sparkConf = pyspark.SparkConf()
> >> >> sparkConf.setAll([
> >> >> ('spark.cores.max', '96')
> >> >> ,('spark.driver.memory', '2g')
> >> >> ,('spark.executor.cores', '4')
> >> >> ,('spark.executor.instances', '2')
> >> >> ,('spark.executor.memory', '4g')
> >> >> ,('spark.network.timeout', '800')
> >> >> ,('spark.scheduler.mode', 'FAIR')
> >> >> ,('spark.shuffle.service.enabled', 'true')
> >> >> ,('spark.dynamicAllocation.enabled', 'true')
> >> >> ])
> >> >> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
> >> >>
> >> >
> >> >
> >> > Thanks,
> >> > --
> >> > Jianshi Huang
> >> >
> >>
> >>
> 

Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Thanks Marcelo,

But I don't want to install 2.3.2 on the worker nodes. I just want Spark to
use the path of the files uploaded to YARN instead of the SPARK_HOME.

On Fri, Oct 5, 2018 at 1:25 AM Marcelo Vanzin  wrote:

> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
> expanded by the shell).
>
> But it's really weird to be setting SPARK_HOME in the environment of
> your node managers. YARN shouldn't need to know about that.
> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
> wrote:
> >
> >
> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
> >
> > The code shows Spark will try to find the path if SPARK_HOME is
> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
> the pre-installed 2.2.1 path.
> >
> > I don't want to make any changes to worker node configuration, so any
> way to override the order?
> >
> > Jianshi
> >
> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
> wrote:
> >>
> >> Normally the version of Spark installed on the cluster does not
> >> matter, since Spark is uploaded from your gateway machine to YARN by
> >> default.
> >>
> >> You probably have some configuration (in spark-defaults.conf) that
> >> tells YARN to use a cached copy. Get rid of that configuration, and
> >> you can use whatever version you like.
> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a problem using multiple versions of Pyspark on YARN, the
> driver and worker nodes are all preinstalled with Spark 2.2.1, for
> production tasks. And I want to use 2.3.2 for my personal EDA.
> >> >
> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >> >
> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >> >
> >> > Here's the error message,
> >> >>
> >> >>
> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> >> Error from python worker:
> >> >> Traceback (most recent call last):
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> >> __import__(pkg_name)
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> >> ModuleNotFoundError: No module named 'py4j'
> >> >> PYTHONPATH was:
> >> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >> >
> >> >
> >> > And here's how I started Pyspark session in Jupyter.
> >> >>
> >> >>
> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> >> import findspark
> >> >> findspark.init()
> >> >> import pyspark
> >> >> sparkConf = pyspark.SparkConf()
> >> >> sparkConf.setAll([
> >> >> ('spark.cores.max', '96')
> >> >> ,('spark.driver.memory', '2g')
> >> >> ,('spark.executor.cores', '4')
> >> >> ,('spark.executor.instances', '2')
> >> >> ,('spark.executor.memory', '4g')
> >> >> ,('spark.network.timeout', '800')
> >> >> ,('spark.scheduler.mode', 'FAIR')
> >> >> ,('spark.shuffle.service.enabled', 'true')
> >> >> ,('spark.dynamicAllocation.enabled', 'true')
> >> >> ])
> >> >> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
> >> >>
> >> >
> >> >
> >> > Thanks,
> >> > --
> >> > Jianshi Huang
> >> >
> >>
> >>
> >> --
> >> Marcelo
> >
> >
> >
> > --
> > Jianshi Huang
> >
> > LinkedIn: jianshi
> > Twitter: @jshuang
> > Github & Blog: http://huangjs.github.com/
>
>
>
> --
> Marcelo
>


-- 
Jianshi Huang


Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Yes, that's right.

On Fri, Oct 5, 2018 at 3:35 AM Gourav Sengupta 
wrote:

> Hi Marcelo,
> it will be great if you illustrate what you mean, I will be interested to
> know.
>
> Hi Jianshi,
> so just to be sure you want to work on SPARK 2.3 while having SPARK 2.1
> installed in your cluster?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Oct 4, 2018 at 6:26 PM Marcelo Vanzin 
> wrote:
>
>> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
>> expanded by the shell).
>>
>> But it's really weird to be setting SPARK_HOME in the environment of
>> your node managers. YARN shouldn't need to know about that.
>> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
>> wrote:
>> >
>> >
>> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
>> >
>> > The code shows Spark will try to find the path if SPARK_HOME is
>> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
>> the pre-installed 2.2.1 path.
>> >
>> > I don't want to make any changes to worker node configuration, so any
>> way to override the order?
>> >
>> > Jianshi
>> >
>> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
>> wrote:
>> >>
>> >> Normally the version of Spark installed on the cluster does not
>> >> matter, since Spark is uploaded from your gateway machine to YARN by
>> >> default.
>> >>
>> >> You probably have some configuration (in spark-defaults.conf) that
>> >> tells YARN to use a cached copy. Get rid of that configuration, and
>> >> you can use whatever version you like.
>> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
>> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have a problem using multiple versions of Pyspark on YARN, the
>> driver and worker nodes are all preinstalled with Spark 2.2.1, for
>> production tasks. And I want to use 2.3.2 for my personal EDA.
>> >> >
>> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
>> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
>> >> >
>> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
>> >> >
>> >> > Here's the error message,
>> >> >>
>> >> >>
>> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
>> >> >> : org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task
>> 0.3 in stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
>> org.apache.spark.SparkException:
>> >> >> Error from python worker:
>> >> >> Traceback (most recent call last):
>> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
>> _run_module_as_main
>> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
>> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
>> _get_module_details
>> >> >> __import__(pkg_name)
>> >> >> File
>> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
>> 46, in 
>> >> >> File
>> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
>> 29, in 
>> >> >> ModuleNotFoundError: No module named 'py4j'
>> >> >> PYTHONPATH was:
>> >> >>
>> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
>> >> >
>> >> >
>> >> > And here's how I started Pyspark session in Jupyter.
>> >> >>
>> >> >>
>> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
>> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
>> >> >> import findspark
>> >> >> findspark.init()
>> >> >> import pyspark
>> >> >> sparkConf = pyspark.SparkConf()
>> >> >> sparkConf.setAll([
>> >> >> ('spark.cores.max', '96')
>> >> >> ,('spark.driver.memory', '2g')
>> >> >> ,('spark.executor.cores', '4')
>> >> >> ,('spark.executor.instances', '2')
>> >> >> ,('spark.executor.memory', '4g')
>> >> >> ,('spark.network.timeout', '800')
>> >> >> ,('spark.scheduler.mode', 'FAIR')
>> >> >> ,('spark.shuffle.service.enabled', 'true')
>> >> >> ,('spark.dynamicAllocation.enabled', 'true')
>> >> >> ])
>> >> >> py_files =
>> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
>> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
>> conf=sparkConf, pyFiles=py_files)
>> >> >>
>> >> >
>> >> >
>> >> > Thanks,
>> >> > --
>> >> > Jianshi Huang
>> >> >
>> >>
>> >>
>> >> --
>> >> Marcelo
>> >
>> >
>> >
>> > --
>> > Jianshi Huang
>> >
>> > LinkedIn: jianshi
>> > Twitter: @jshuang
>> > Github & Blog: http://huangjs.github.com/
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
Jianshi Huang


Where is the DAG stored before catalyst gets it?

2018-10-04 Thread Jean Georges Perrin
Hi, 

I am assuming it is still in the master and when catalyst is finished it sends 
the tasks to the workers.

Correct?

tia

jg
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark structured streaming job throws socket exception

2018-10-04 Thread mmuru
Thanks Ryan.

Attached the whole stack trace. Let me know if you need more information.

pyspark-driver-pod-exception.txt

  



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark structured streaming job throws socket exception

2018-10-04 Thread Shixiong(Ryan) Zhu
As far as I know, the error log in updateAccumulators will not fail a Spark
task. Did you see other error messages?

Best Regards,
Ryan


On Thu, Oct 4, 2018 at 2:14 PM mmuru  wrote:

> Hi,
>
> Running Pyspark structured streaming job on K8S with 2 executor pods. The
> driver pod failed with the following up exception. It fails consistently
> after 3 to 6hrs of running.
>
> Any idea how to fix this exception. I really appreciate your help.
>
>
> 2018-10-04 18:48:27 ERROR DAGScheduler:91 - Failed to update accumulators
> for task 21
> java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:210)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.net.SocketInputStream.read(SocketInputStream.java:224)
> at
> org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:659)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1249)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
>
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1249)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1331)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2052)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2041)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> 2018-10-04 18:48:27 ERROR DAGScheduler:91 - Failed to update accumulators
> for task 22
> java.net.SocketException: Broken pipe (Write failed)
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> at
> org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:657)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1249)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
>
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1249)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1331)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2052)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2041)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


PySpark structured streaming job throws socket exception

2018-10-04 Thread mmuru
Hi,

Running Pyspark structured streaming job on K8S with 2 executor pods. The
driver pod failed with the following up exception. It fails consistently
after 3 to 6hrs of running.  

Any idea how to fix this exception. I really appreciate your help. 


2018-10-04 18:48:27 ERROR DAGScheduler:91 - Failed to update accumulators
for task 21
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:224)
at
org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:659)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1249)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1249)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2052)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2041)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
2018-10-04 18:48:27 ERROR DAGScheduler:91 - Failed to update accumulators
for task 22
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at
org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:657)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1249)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1249)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2052)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2041)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Pyspark Partitioning

2018-10-04 Thread Vitaliy Pisarev
Groupby is an operator you would use if you wanted to *aggregate* the
values that are grouped by rhe specify key.

In your case you want to retain access to the values.

You need to do df.partitionBy and then you can map the partirions. Of
course you need to be carefull of potential skews in the resulting
partitions.

On Thu, Oct 4, 2018, 23:27 dimitris plakas  wrote:

> Hello everyone,
>
> Here is an issue that i am facing in partitioning dtafarame.
>
> I have a dataframe which called data_df. It is look like:
>
> Group_Id | Object_Id | Trajectory
>1 |  obj1| Traj1
>2 |  obj2| Traj2
>1 |  obj3| Traj3
>3 |  obj4| Traj4
>2 |  obj5| Traj5
>
> This dataframe has 5045 rows where each row has value in Group_Id from 1
> to 7, and the number of rows per group_id is arbitrary.
> I want to split the rdd which produced by from this dataframe in 7
> partitions one for each group_id and then apply mapPartitions() where i
> call function custom_func(). How can i create these partitions from this
> dataframe? Should i first apply group by (create the grouped_df) in order
> to create a dataframe with 7 rows and then call
> partitioned_rdd=grouped_df.rdd.mapPartitions()?
> Which is the optimal way to do it?
>
> Thank you in advance
>


Pyspark Partitioning

2018-10-04 Thread dimitris plakas
Hello everyone,

Here is an issue that i am facing in partitioning dtafarame.

I have a dataframe which called data_df. It is look like:

Group_Id | Object_Id | Trajectory
   1 |  obj1| Traj1
   2 |  obj2| Traj2
   1 |  obj3| Traj3
   3 |  obj4| Traj4
   2 |  obj5| Traj5

This dataframe has 5045 rows where each row has value in Group_Id from 1 to
7, and the number of rows per group_id is arbitrary.
I want to split the rdd which produced by from this dataframe in 7
partitions one for each group_id and then apply mapPartitions() where i
call function custom_func(). How can i create these partitions from this
dataframe? Should i first apply group by (create the grouped_df) in order
to create a dataframe with 7 rows and then call
partitioned_rdd=grouped_df.rdd.mapPartitions()?
Which is the optimal way to do it?

Thank you in advance


Spark 2.3.1 leaves _temporary dir back on s3 even after write to s3 is done.

2018-10-04 Thread sushil.chaudhary
folks,
We recently upgraded to 2.3.1 and we started seeing that, the spark jobs
leaves _temporary directory in the s3 even though write to s3 already
finish. It do not cleanup the temporary directory. 
Hadoop version 2.8.  is there a way to control it?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Gourav Sengupta
Hi Marcelo,
it will be great if you illustrate what you mean, I will be interested to
know.

Hi Jianshi,
so just to be sure you want to work on SPARK 2.3 while having SPARK 2.1
installed in your cluster?

Regards,
Gourav Sengupta

On Thu, Oct 4, 2018 at 6:26 PM Marcelo Vanzin 
wrote:

> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
> expanded by the shell).
>
> But it's really weird to be setting SPARK_HOME in the environment of
> your node managers. YARN shouldn't need to know about that.
> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
> wrote:
> >
> >
> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
> >
> > The code shows Spark will try to find the path if SPARK_HOME is
> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
> the pre-installed 2.2.1 path.
> >
> > I don't want to make any changes to worker node configuration, so any
> way to override the order?
> >
> > Jianshi
> >
> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
> wrote:
> >>
> >> Normally the version of Spark installed on the cluster does not
> >> matter, since Spark is uploaded from your gateway machine to YARN by
> >> default.
> >>
> >> You probably have some configuration (in spark-defaults.conf) that
> >> tells YARN to use a cached copy. Get rid of that configuration, and
> >> you can use whatever version you like.
> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a problem using multiple versions of Pyspark on YARN, the
> driver and worker nodes are all preinstalled with Spark 2.2.1, for
> production tasks. And I want to use 2.3.2 for my personal EDA.
> >> >
> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >> >
> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >> >
> >> > Here's the error message,
> >> >>
> >> >>
> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> >> Error from python worker:
> >> >> Traceback (most recent call last):
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> >> __import__(pkg_name)
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> >> ModuleNotFoundError: No module named 'py4j'
> >> >> PYTHONPATH was:
> >> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >> >
> >> >
> >> > And here's how I started Pyspark session in Jupyter.
> >> >>
> >> >>
> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> >> import findspark
> >> >> findspark.init()
> >> >> import pyspark
> >> >> sparkConf = pyspark.SparkConf()
> >> >> sparkConf.setAll([
> >> >> ('spark.cores.max', '96')
> >> >> ,('spark.driver.memory', '2g')
> >> >> ,('spark.executor.cores', '4')
> >> >> ,('spark.executor.instances', '2')
> >> >> ,('spark.executor.memory', '4g')
> >> >> ,('spark.network.timeout', '800')
> >> >> ,('spark.scheduler.mode', 'FAIR')
> >> >> ,('spark.shuffle.service.enabled', 'true')
> >> >> ,('spark.dynamicAllocation.enabled', 'true')
> >> >> ])
> >> >> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
> >> >>
> >> >
> >> >
> >> > Thanks,
> >> > --
> >> > Jianshi Huang
> >> >
> >>
> >>
> >> --
> >> Marcelo
> >
> >
> >
> > --
> > Jianshi Huang
> >
> > LinkedIn: jianshi
> > Twitter: @jshuang
> > Github & Blog: http://huangjs.github.com/
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


subscribe

2018-10-04 Thread Sushil Chaudhary
-- 
Regards,
Sushil


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Marcelo Vanzin
Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
expanded by the shell).

But it's really weird to be setting SPARK_HOME in the environment of
your node managers. YARN shouldn't need to know about that.
On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang  wrote:
>
> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
>
> The code shows Spark will try to find the path if SPARK_HOME is specified. 
> And on my worker node, SPARK_HOME is specified in .bashrc , for the 
> pre-installed 2.2.1 path.
>
> I don't want to make any changes to worker node configuration, so any way to 
> override the order?
>
> Jianshi
>
> On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin  wrote:
>>
>> Normally the version of Spark installed on the cluster does not
>> matter, since Spark is uploaded from your gateway machine to YARN by
>> default.
>>
>> You probably have some configuration (in spark-defaults.conf) that
>> tells YARN to use a cached copy. Get rid of that configuration, and
>> you can use whatever version you like.
>> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang  wrote:
>> >
>> > Hi,
>> >
>> > I have a problem using multiple versions of Pyspark on YARN, the driver 
>> > and worker nodes are all preinstalled with Spark 2.2.1, for production 
>> > tasks. And I want to use 2.3.2 for my personal EDA.
>> >
>> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however 
>> > on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
>> >
>> > Anyone knows how to override the PYTHONPATH on worker nodes?
>> >
>> > Here's the error message,
>> >>
>> >>
>> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
>> >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
>> >> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
>> >> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): 
>> >> org.apache.spark.SparkException:
>> >> Error from python worker:
>> >> Traceback (most recent call last):
>> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in 
>> >> _run_module_as_main
>> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
>> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in 
>> >> _get_module_details
>> >> __import__(pkg_name)
>> >> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", 
>> >> line 46, in 
>> >> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", 
>> >> line 29, in 
>> >> ModuleNotFoundError: No module named 'py4j'
>> >> PYTHONPATH was:
>> >> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
>> >
>> >
>> > And here's how I started Pyspark session in Jupyter.
>> >>
>> >>
>> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
>> >> %env PYSPARK_PYTHON=/usr/bin/python3
>> >> import findspark
>> >> findspark.init()
>> >> import pyspark
>> >> sparkConf = pyspark.SparkConf()
>> >> sparkConf.setAll([
>> >> ('spark.cores.max', '96')
>> >> ,('spark.driver.memory', '2g')
>> >> ,('spark.executor.cores', '4')
>> >> ,('spark.executor.instances', '2')
>> >> ,('spark.executor.memory', '4g')
>> >> ,('spark.network.timeout', '800')
>> >> ,('spark.scheduler.mode', 'FAIR')
>> >> ,('spark.shuffle.service.enabled', 'true')
>> >> ,('spark.dynamicAllocation.enabled', 'true')
>> >> ])
>> >> py_files = 
>> >> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
>> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", 
>> >> conf=sparkConf, pyFiles=py_files)
>> >>
>> >
>> >
>> > Thanks,
>> > --
>> > Jianshi Huang
>> >
>>
>>
>> --
>> Marcelo
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31

The code shows Spark will try to find the path if SPARK_HOME is specified.
And on my worker node, SPARK_HOME is specified in .bashrc , for the
pre-installed 2.2.1 path.

I don't want to make any changes to worker node configuration, so any way
to override the order?

Jianshi

On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin  wrote:

> Normally the version of Spark installed on the cluster does not
> matter, since Spark is uploaded from your gateway machine to YARN by
> default.
>
> You probably have some configuration (in spark-defaults.conf) that
> tells YARN to use a cached copy. Get rid of that configuration, and
> you can use whatever version you like.
> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >
> > Hi,
> >
> > I have a problem using multiple versions of Pyspark on YARN, the driver
> and worker nodes are all preinstalled with Spark 2.2.1, for production
> tasks. And I want to use 2.3.2 for my personal EDA.
> >
> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however
> on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >
> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >
> > Here's the error message,
> >>
> >>
> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> Error from python worker:
> >> Traceback (most recent call last):
> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> __import__(pkg_name)
> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> ModuleNotFoundError: No module named 'py4j'
> >> PYTHONPATH was:
> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >
> >
> > And here's how I started Pyspark session in Jupyter.
> >>
> >>
> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> import findspark
> >> findspark.init()
> >> import pyspark
> >> sparkConf = pyspark.SparkConf()
> >> sparkConf.setAll([
> >> ('spark.cores.max', '96')
> >> ,('spark.driver.memory', '2g')
> >> ,('spark.executor.cores', '4')
> >> ,('spark.executor.instances', '2')
> >> ,('spark.executor.memory', '4g')
> >> ,('spark.network.timeout', '800')
> >> ,('spark.scheduler.mode', 'FAIR')
> >> ,('spark.shuffle.service.enabled', 'true')
> >> ,('spark.dynamicAllocation.enabled', 'true')
> >> ])
> >> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
> >>
> >
> >
> > Thanks,
> > --
> > Jianshi Huang
> >
>
>
> --
> Marcelo
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Marcelo Vanzin
Normally the version of Spark installed on the cluster does not
matter, since Spark is uploaded from your gateway machine to YARN by
default.

You probably have some configuration (in spark-defaults.conf) that
tells YARN to use a cached copy. Get rid of that configuration, and
you can use whatever version you like.
On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang  wrote:
>
> Hi,
>
> I have a problem using multiple versions of Pyspark on YARN, the driver and 
> worker nodes are all preinstalled with Spark 2.2.1, for production tasks. And 
> I want to use 2.3.2 for my personal EDA.
>
> I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however on 
> the worker node, the PYTHONPATH still uses the system SPARK_HOME.
>
> Anyone knows how to override the PYTHONPATH on worker nodes?
>
> Here's the error message,
>>
>>
>> Py4JJavaError: An error occurred while calling o75.collectToPython.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
>> in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 
>> (TID 3, emr-worker-8.cluster-68492, executor 2): 
>> org.apache.spark.SparkException:
>> Error from python worker:
>> Traceback (most recent call last):
>> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in 
>> _run_module_as_main
>> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
>> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in 
>> _get_module_details
>> __import__(pkg_name)
>> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", 
>> line 46, in 
>> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", 
>> line 29, in 
>> ModuleNotFoundError: No module named 'py4j'
>> PYTHONPATH was:
>> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
>
>
> And here's how I started Pyspark session in Jupyter.
>>
>>
>> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
>> %env PYSPARK_PYTHON=/usr/bin/python3
>> import findspark
>> findspark.init()
>> import pyspark
>> sparkConf = pyspark.SparkConf()
>> sparkConf.setAll([
>> ('spark.cores.max', '96')
>> ,('spark.driver.memory', '2g')
>> ,('spark.executor.cores', '4')
>> ,('spark.executor.instances', '2')
>> ,('spark.executor.memory', '4g')
>> ,('spark.network.timeout', '800')
>> ,('spark.scheduler.mode', 'FAIR')
>> ,('spark.shuffle.service.enabled', 'true')
>> ,('spark.dynamicAllocation.enabled', 'true')
>> ])
>> py_files = ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
>> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", 
>> conf=sparkConf, pyFiles=py_files)
>>
>
>
> Thanks,
> --
> Jianshi Huang
>


-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: How to do sliding window operation on RDDs in Pyspark?

2018-10-04 Thread zakhavan
Thank you. It helps.

Zeinab



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes

2018-10-04 Thread hager
please, I have same problem.
Have you found any solution?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Apostolos N. Papadopoulos

Maybe this can help.

https://stackoverflow.com/questions/32959723/set-python-path-for-spark-worker



On 04/10/2018 12:19 μμ, Jianshi Huang wrote:

Hi,

I have a problem using multiple versions of Pyspark on YARN, the 
driver and worker nodes are all preinstalled with Spark 2.2.1, for 
production tasks. And I want to use 2.3.2 for my personal EDA.


I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), 
however on the worker node, the PYTHONPATH still uses the system 
SPARK_HOME.


Anyone knows how to override the PYTHONPATH on worker nodes?

Here's the error message,


Py4JJavaError: An error occurred while calling o75.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 0.0 (TID 3, emr-worker-8.cluster-68492,
executor 2): org.apache.spark.SparkException:
Error from python worker:
Traceback (most recent call last):
File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183,
in _run_module_as_main
mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109,
in _get_module_details
__import__(pkg_name)
File
"/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py",
line 46, in 
File
"/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py",
line 29, in 
ModuleNotFoundError: No module named 'py4j'
PYTHONPATH was:

/usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar


And here's how I started Pyspark session in Jupyter.


%env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
%env PYSPARK_PYTHON=/usr/bin/python3
import findspark
findspark.init()
import pyspark
sparkConf = pyspark.SparkConf()
sparkConf.setAll([
('spark.cores.max', '96')
,('spark.driver.memory', '2g')
,('spark.executor.cores', '4')
,('spark.executor.instances', '2')
,('spark.executor.memory', '4g')
,('spark.network.timeout', '800')
,('spark.scheduler.mode', 'FAIR')
,('spark.shuffle.service.enabled', 'true')
,('spark.dynamicAllocation.enabled', 'true')
])
py_files =
['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
conf=sparkConf, pyFiles=py_files)



Thanks,
--
Jianshi Huang



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol



Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Hi,

I have a problem using multiple versions of Pyspark on YARN, the driver and
worker nodes are all preinstalled with Spark 2.2.1, for production tasks.
And I want to use 2.3.2 for my personal EDA.

I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however on
the worker node, the PYTHONPATH still uses the system SPARK_HOME.

Anyone knows how to override the PYTHONPATH on worker nodes?

Here's the error message,

>
> Py4JJavaError: An error occurred while calling o75.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> Error from python worker:
> Traceback (most recent call last):
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> __import__(pkg_name)
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py",
> line 46, in 
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py",
> line 29, in 
> ModuleNotFoundError: No module named 'py4j'
> PYTHONPATH was:
>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar


And here's how I started Pyspark session in Jupyter.

>
> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> %env PYSPARK_PYTHON=/usr/bin/python3
> import findspark
> findspark.init()
> import pyspark
> sparkConf = pyspark.SparkConf()
> sparkConf.setAll([
> ('spark.cores.max', '96')
> ,('spark.driver.memory', '2g')
> ,('spark.executor.cores', '4')
> ,('spark.executor.instances', '2')
> ,('spark.executor.memory', '4g')
> ,('spark.network.timeout', '800')
> ,('spark.scheduler.mode', 'FAIR')
> ,('spark.shuffle.service.enabled', 'true')
> ,('spark.dynamicAllocation.enabled', 'true')
> ])
> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
>
>

Thanks,
-- 
Jianshi Huang


Re: Use SparkContext in Web Application

2018-10-04 Thread Girish Vasmatkar
Thank you Vincent and Jorn for your inputs, much appreciated.

Our web-app already has a scheduler mechanism and other jobs are already
running in the system. Would you still prefer to decouple model training in
a separate scheduling tool outside of our web-app JVM?
We are using test Data for now as we are very new to spark. It's 256 MB
data and the model size is coming out to be around 240KB and is being saved
in parquet format by default.

Thanks again for your help!

On Thu, Oct 4, 2018 at 12:30 PM vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Decoupling the web app from Spark backend is recommended. Training the
> model can be launched in the background via a scheduling tool. Inferring
> the model with Spark in interactive mode s not a good option as it will do
> it for unitary data and Spark is better in using large dataset. The
> original purpose of inferring with Spark was to do it offline for large
> datasets and store the results in a KV store for instance, then any
> consumer like your web app would just read the KV store. I would personally
> store the trained model in PFA or PMML and serve it via another tool.
> There are lots of tools to serve the models via API from managed solution
> like Amazon Sagemaker to open source solution like Prediction.io
> If you still want to call Spark backend from your web app, what I don't
> recommend, I would do it using Spark Jobserver or Livy to interact via rest
> API.
>
> Le jeu. 4 oct. 2018 à 08:25, Jörn Franke  a écrit :
>
>> Depending on your model size you can store it as PFA or PMML and run the
>> prediction in Java. For larger models you will need a custom solution ,
>> potentially using a spark thrift Server/spark job server/Livy and a cache
>> to store predictions that have been already calculated (eg based on
>> previous requests to predict). Then you run also into thoughts on caching
>> prediction results on the model version that has been used, evicting
>> non-relevant predictions etc
>> Making the model available as a service is currently a topic where a lot
>> of custom „plumbing“ is required , especially if models are a little bit
>> larger.
>>
>> Am 04.10.2018 um 06:55 schrieb Girish Vasmatkar <
>> girish.vasmat...@hotwaxsystems.com>:
>>
>>
>>
>> On Mon, Oct 1, 2018 at 12:18 PM Girish Vasmatkar <
>> girish.vasmat...@hotwaxsystems.com> wrote:
>>
>>> Hi All
>>>
>>> We are very early into our Spark days so the following may sound like a
>>> novice question :) I will try to keep this as short as possible.
>>>
>>> We are trying to use Spark to introduce a recommendation engine that can
>>> be used to provide product recommendations and need help on some design
>>> decisions before moving forward. Ours is a web application running on
>>> Tomcat. So far, I have created a simple POC (standalone java program) that
>>> reads in a CSV file and feeds to FPGrowth and then fits the data and runs
>>> transformations. I would like to be able to do the following -
>>>
>>>
>>>- Scheduler runs nightly in Tomcat (which it does currently) and
>>>reads everything from the DB to train/fit the system. This can grow into
>>>really some large data and everyday we will have new data. Should I just
>>>use SparkContext here, within my scheduler, to FIT the system? Is this
>>>correct way to go about this? I am also planning to save the model on S3
>>>which should be okay. We also thought on using HDFS. The scheduler's job
>>>will be just to create model and save the same and be done with it.
>>>- On the product page, we can then use the saved model to display
>>>the product recommendations for a particular product.
>>>- My understanding is that I should be able to use SparkContext here
>>>in my web application to just load the saved model and use it to derive 
>>> the
>>>recommendations. Is this a good design? The problem I see using this
>>>approach is that the SparkContext does take time to initialize and this 
>>> may
>>>cost dearly. Or should we keep SparkContext per web application to use a
>>>single instance of the same? We can initialize a SparkContext during
>>>application context initializaion phase.
>>>
>>>
>>> Since I am fairly new to using Spark properly, please help me take
>>> decision on whether the way I plan to use Spark is the recommended way? I
>>> have also seen use cases involving kafka tha does communication with Spark,
>>> but can we not do it directly using Spark Context? I am sure a lot of my
>>> understanding is wrong, so please feel free to correct me.
>>>
>>> Thanks and Regards,
>>> Girish Vasmatkar
>>> HotWax Systems
>>>
>>>
>>>
>>>


Re: Use SparkContext in Web Application

2018-10-04 Thread vincent gromakowski
Decoupling the web app from Spark backend is recommended. Training the
model can be launched in the background via a scheduling tool. Inferring
the model with Spark in interactive mode s not a good option as it will do
it for unitary data and Spark is better in using large dataset. The
original purpose of inferring with Spark was to do it offline for large
datasets and store the results in a KV store for instance, then any
consumer like your web app would just read the KV store. I would personally
store the trained model in PFA or PMML and serve it via another tool.
There are lots of tools to serve the models via API from managed solution
like Amazon Sagemaker to open source solution like Prediction.io
If you still want to call Spark backend from your web app, what I don't
recommend, I would do it using Spark Jobserver or Livy to interact via rest
API.

Le jeu. 4 oct. 2018 à 08:25, Jörn Franke  a écrit :

> Depending on your model size you can store it as PFA or PMML and run the
> prediction in Java. For larger models you will need a custom solution ,
> potentially using a spark thrift Server/spark job server/Livy and a cache
> to store predictions that have been already calculated (eg based on
> previous requests to predict). Then you run also into thoughts on caching
> prediction results on the model version that has been used, evicting
> non-relevant predictions etc
> Making the model available as a service is currently a topic where a lot
> of custom „plumbing“ is required , especially if models are a little bit
> larger.
>
> Am 04.10.2018 um 06:55 schrieb Girish Vasmatkar <
> girish.vasmat...@hotwaxsystems.com>:
>
>
>
> On Mon, Oct 1, 2018 at 12:18 PM Girish Vasmatkar <
> girish.vasmat...@hotwaxsystems.com> wrote:
>
>> Hi All
>>
>> We are very early into our Spark days so the following may sound like a
>> novice question :) I will try to keep this as short as possible.
>>
>> We are trying to use Spark to introduce a recommendation engine that can
>> be used to provide product recommendations and need help on some design
>> decisions before moving forward. Ours is a web application running on
>> Tomcat. So far, I have created a simple POC (standalone java program) that
>> reads in a CSV file and feeds to FPGrowth and then fits the data and runs
>> transformations. I would like to be able to do the following -
>>
>>
>>- Scheduler runs nightly in Tomcat (which it does currently) and
>>reads everything from the DB to train/fit the system. This can grow into
>>really some large data and everyday we will have new data. Should I just
>>use SparkContext here, within my scheduler, to FIT the system? Is this
>>correct way to go about this? I am also planning to save the model on S3
>>which should be okay. We also thought on using HDFS. The scheduler's job
>>will be just to create model and save the same and be done with it.
>>- On the product page, we can then use the saved model to display the
>>product recommendations for a particular product.
>>- My understanding is that I should be able to use SparkContext here
>>in my web application to just load the saved model and use it to derive 
>> the
>>recommendations. Is this a good design? The problem I see using this
>>approach is that the SparkContext does take time to initialize and this 
>> may
>>cost dearly. Or should we keep SparkContext per web application to use a
>>single instance of the same? We can initialize a SparkContext during
>>application context initializaion phase.
>>
>>
>> Since I am fairly new to using Spark properly, please help me take
>> decision on whether the way I plan to use Spark is the recommended way? I
>> have also seen use cases involving kafka tha does communication with Spark,
>> but can we not do it directly using Spark Context? I am sure a lot of my
>> understanding is wrong, so please feel free to correct me.
>>
>> Thanks and Regards,
>> Girish Vasmatkar
>> HotWax Systems
>>
>>
>>
>>


Re: Use SparkContext in Web Application

2018-10-04 Thread Jörn Franke
Depending on your model size you can store it as PFA or PMML and run the 
prediction in Java. For larger models you will need a custom solution , 
potentially using a spark thrift Server/spark job server/Livy and a cache to 
store predictions that have been already calculated (eg based on previous 
requests to predict). Then you run also into thoughts on caching prediction 
results on the model version that has been used, evicting non-relevant 
predictions etc
Making the model available as a service is currently a topic where a lot of 
custom „plumbing“ is required , especially if models are a little bit larger.

> Am 04.10.2018 um 06:55 schrieb Girish Vasmatkar 
> :
> 
> 
> 
>> On Mon, Oct 1, 2018 at 12:18 PM Girish Vasmatkar 
>>  wrote:
>> Hi All
>> 
>> We are very early into our Spark days so the following may sound like a 
>> novice question :) I will try to keep this as short as possible.
>> 
>> We are trying to use Spark to introduce a recommendation engine that can be 
>> used to provide product recommendations and need help on some design 
>> decisions before moving forward. Ours is a web application running on 
>> Tomcat. So far, I have created a simple POC (standalone java program) that 
>> reads in a CSV file and feeds to FPGrowth and then fits the data and runs 
>> transformations. I would like to be able to do the following -
>> 
>> Scheduler runs nightly in Tomcat (which it does currently) and reads 
>> everything from the DB to train/fit the system. This can grow into really 
>> some large data and everyday we will have new data. Should I just use 
>> SparkContext here, within my scheduler, to FIT the system? Is this correct 
>> way to go about this? I am also planning to save the model on S3 which 
>> should be okay. We also thought on using HDFS. The scheduler's job will be 
>> just to create model and save the same and be done with it.
>> On the product page, we can then use the saved model to display the product 
>> recommendations for a particular product.
>> My understanding is that I should be able to use SparkContext here in my web 
>> application to just load the saved model and use it to derive the 
>> recommendations. Is this a good design? The problem I see using this 
>> approach is that the SparkContext does take time to initialize and this may 
>> cost dearly. Or should we keep SparkContext per web application to use a 
>> single instance of the same? We can initialize a SparkContext during 
>> application context initializaion phase. 
>> 
>> Since I am fairly new to using Spark properly, please help me take decision 
>> on whether the way I plan to use Spark is the recommended way? I have also 
>> seen use cases involving kafka tha does communication with Spark, but can we 
>> not do it directly using Spark Context? I am sure a lot of my understanding 
>> is wrong, so please feel free to correct me.
>> 
>> Thanks and Regards,
>> Girish Vasmatkar
>> HotWax Systems
>> 
>> 
>>