Memory allocation

2020-04-17 Thread Pat Ferrel
I have used Spark for several years and realize from recent chatter on this 
list that I don’t really understand how it uses memory.

Specifically is spark.executor.memory and spark.driver.memory taken from the 
JVM heap when does Spark take memory from JVM heap and when it is from off JVM 
heap.

Since spark.executor.memory and spark.driver.memory are job params, I have 
always assumed that the required memory was off-JVM-heap.  Or am I on the wrong 
track altogether?

Can someone point me to a discussion of this?

thanks

Re: IDE suitable for Spark

2020-04-07 Thread Pat Ferrel
IntelliJ Scala works well when debugging master=local. Has anyone used it for 
remote/cluster debugging? I’ve heard it is possible...


From: Luiz Camargo 
Reply: Luiz Camargo 
Date: April 7, 2020 at 10:26:35 AM
To: Dennis Suhari 
Cc: yeikel valdes , zahidr1...@gmail.com 
, user@spark.apache.org 
Subject:  Re: IDE suitable for Spark  

I have used IntelliJ Spark/Scala with the sbt tool

On Tue, Apr 7, 2020 at 1:18 PM Dennis Suhari  
wrote:
We are using Pycharm resp. R Studio with Spark libraries to submit Spark Jobs. 

Von meinem iPhone gesendet

Am 07.04.2020 um 18:10 schrieb yeikel valdes :



Zeppelin is not an IDE but a notebook.  It is helpful to experiment but it is 
missing a lot of the features that we expect from an IDE.

Thanks for sharing though. 

 On Tue, 07 Apr 2020 04:45:33 -0400 zahidr1...@gmail.com wrote 

When I first logged on I asked if there was a suitable IDE for Spark.
I did get a couple of responses.  
Thanks.  

I did actually find one which is suitable IDE for spark.  
That is  Apache Zeppelin.

One of many reasons it is suitable for Apache Spark is.
The  up and running Stage which involves typing bin/zeppelin-daemon.sh start
Go to browser and type http://localhost:8080  
That's it!

Then to
Hit the ground running   
There are also ready to go Apache Spark examples
showing off the type of functionality one will be using in real life production.

Zeppelin comes with  embedded Apache Spark  and scala as default interpreter 
with 20 + interpreters.
I have gone on to discover there are a number of other advantages for real time 
production
environment with Zeppelin offered up by other Apache Products.

Backbutton.co.uk
¯\_(ツ)_/¯  
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



--  


Prof. Luiz Camargo
Educador - Computação



Re: k8s orchestrating Spark service

2019-07-03 Thread Pat Ferrel
Thanks for the in depth explanation.

These methods would require us to architect our Server around Spark and it
is actually designed to be independent of the ML implementation. SparkML is
an important algo source, to be sure, but so is TensorFlow, and Python
non-spark libs among others. So Spark stays at arms length in a
microservices pattern. Doing this with access to Job status and management
is why Livy and the (Spark) Job Server exist. To us the ideal is treating
Spark like a compute server that will respond to a service API for job
submittal and control.

None of the above is solved by k8s Spark. Further we find that the Spark
Programatic API does not support deploy mode = “cluster”. This means we
have to take a simple part of our code and partition it into new Jars only
to get spark-submit to work. To help with Job tracking and management when
you are not using the Programatic API we look to Livy. I guess if you ask
our opinion of spark-submit, we’d (selfishly) say it hides architectural
issues that should be solved in the Spark Programatic API but the
popularity of spark-submit is causing the community to avoid these or just
not see or care about them. I guess we’ll see if Spark behind Livy gives us
what we want.

Maybe this is unusual but we see Spark as a service, not an integral
platform. We also see Kubernetes as very important but optional for HA or
when you want to scale horizontally, basically when vertical is not
sufficient. Vertical scaling is more cost effective so Docker Compose is a
nice solution for simpler, Kubernetes-less deployments.

So if we are agnostic about the job master, and communicate through Livy,
we are back to orchestrating services with Docker and Kubernetes. If k8s
becomes a super duper job master, great! But it doesn’t solve todays
question.


From: Matt Cheah  
Reply: Matt Cheah  
Date: July 1, 2019 at 5:14:05 PM
To: Pat Ferrel  ,
user@spark.apache.org  
Subject:  Re: k8s orchestrating Spark service

> We’d like to deploy Spark Workers/Executors and Master (whatever master
is easiest to talk about since we really don’t care) in pods as we do with
the other services we use. Replace Spark Master with k8s if you insist. How
do the executors get deployed?



When running Spark against Kubernetes natively, the Spark library handles
requesting executors from the API server. So presumably one would only need
to know how to start the driver in the cluster – maybe spark-operator,
spark-submit, or just starting the pod and making a Spark context in client
mode with the right parameters. From there, the Spark scheduler code knows
how to interface with the API server and request executor pods according to
the resource requests configured in the app.



> We have a machine Learning Server. It submits various jobs through the
Spark Scala API. The Server is run in a pod deployed from a chart by k8s.
It later uses the Spark API to submit jobs. I guess we find spark-submit to
be a roadblock to our use of Spark and the k8s support is fine but how do
you run our Driver and Executors considering that the Driver is part of the
Server process?



It depends on how the server runs the jobs:

   - If each job is meant to be a separate forked driver pod / process: The
   ML server code can use the SparkLauncher API
   
<https://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/SparkLauncher.html>
   and configure the Spark driver through that API. Set the master to point to
   the Kubernetes API server and set the parameters for credentials according
   to your setup. SparkLauncher is a thin layer on top of spark-submit; a
   Spark distribution has to be packaged with the ML server image and
   SparkLauncher would point to the spark-submit script in said distribution.
   - If all jobs run inside the same driver, that being the ML server: One
   has to start the ML server with the right parameters to point to the
   Kubernetes master. Since the ML server is a driver, one has the option to
   use spark-submit or SparkLauncher to deploy the ML server itself.
   Alternatively one can use a custom script to start the ML server, then the
   ML server process has to create a SparkContext object parameterized against
   the Kubernetes server in question.



I hope this helps!



-Matt Cheah

*From: *Pat Ferrel 
*Date: *Monday, July 1, 2019 at 5:05 PM
*To: *"user@spark.apache.org" , Matt Cheah <
mch...@palantir.com>
*Subject: *Re: k8s orchestrating Spark service



We have a machine Learning Server. It submits various jobs through the
Spark Scala API. The Server is run in a pod deployed from a chart by k8s.
It later uses the Spark API to submit jobs. I guess we find spark-submit to
be a roadblock to our use of Spark and the k8s support is fine but how do
you run our Driver and Executors considering that the Driver is part of the
Server process?



Maybe we are talking past each other with some mistaken assumptions (on my
part perhaps).







From: Pat

Re: k8s orchestrating Spark service

2019-07-01 Thread Pat Ferrel
We have a machine Learning Server. It submits various jobs through the
Spark Scala API. The Server is run in a pod deployed from a chart by k8s.
It later uses the Spark API to submit jobs. I guess we find spark-submit to
be a roadblock to our use of Spark and the k8s support is fine but how do
you run our Driver and Executors considering that the Driver is part of the
Server process?

Maybe we are talking past each other with some mistaken assumptions (on my
part perhaps).



From: Pat Ferrel  
Reply: Pat Ferrel  
Date: July 1, 2019 at 4:57:20 PM
To: user@spark.apache.org  , Matt
Cheah  
Subject:  Re: k8s orchestrating Spark service

k8s as master would be nice but doesn’t solve the problem of running the
full cluster and is an orthogonal issue.

We’d like to deploy Spark Workers/Executors and Master (whatever master is
easiest to talk about since we really don’t care) in pods as we do with the
other services we use. Replace Spark Master with k8s if you insist. How do
the executors get deployed?

We have our own containers that almost work for 2.3.3. We have used this
before with older Spark so we are reasonably sure it makes sense. We just
wonder if our own image builds and charts are the best starting point.

Does anyone have something they like?


From: Matt Cheah  
Reply: Matt Cheah  
Date: July 1, 2019 at 4:45:55 PM
To: Pat Ferrel  ,
user@spark.apache.org  
Subject:  Re: k8s orchestrating Spark service

Sorry, I don’t quite follow – why use the Spark standalone cluster as an
in-between layer when one can just deploy the Spark application directly
inside the Helm chart? I’m curious as to what the use case is, since I’m
wondering if there’s something we can improve with respect to the native
integration with Kubernetes here. Deploying on Spark standalone mode in
Kubernetes is, to my understanding, meant to be superseded by the native
integration introduced in Spark 2.4.



*From: *Pat Ferrel 
*Date: *Monday, July 1, 2019 at 4:40 PM
*To: *"user@spark.apache.org" , Matt Cheah <
mch...@palantir.com>
*Subject: *Re: k8s orchestrating Spark service



Thanks Matt,



Actually I can’t use spark-submit. We submit the Driver programmatically
through the API. But this is not the issue and using k8s as the master is
also not the issue though you may be right about it being easier, it
doesn’t quite get to the heart.



We want to orchestrate a bunch of services including Spark. The rest work,
we are asking if anyone has seen a good starting point for adding Spark as
a k8s managed service.




From: Matt Cheah  
Reply: Matt Cheah  
Date: July 1, 2019 at 3:26:20 PM
To: Pat Ferrel  ,
user@spark.apache.org  
Subject:  Re: k8s orchestrating Spark service



I would recommend looking into Spark’s native support for running on
Kubernetes. One can just start the application against Kubernetes directly
using spark-submit in cluster mode or starting the Spark context with the
right parameters in client mode. See
https://spark.apache.org/docs/latest/running-on-kubernetes.html
[spark.apache.org]
<https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_running-2Don-2Dkubernetes.html=DwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=4XyH4cxucBNQAlSaHyR4gXJbHIo9g9vcur4VzBCYkwk=Q6mv_pZUq3UmxJU6EiJYJvG8L44WBeWJyAnw3vG0GBw=>



I would think that building Helm around this architecture of running Spark
applications would be easier than running a Spark standalone cluster. But
admittedly I’m not very familiar with the Helm technology – we just use
spark-submit.



-Matt Cheah

*From: *Pat Ferrel 
*Date: *Sunday, June 30, 2019 at 12:55 PM
*To: *"user@spark.apache.org" 
*Subject: *k8s orchestrating Spark service



We're trying to setup a system that includes Spark. The rest of the
services have good Docker containers and Helm charts to start from.



Spark on the other hand is proving difficult. We forked a container and
have tried to create our own chart but are having several problems with
this.



So back to the community… Can anyone recommend a Docker Container + Helm
Chart for use with Kubernetes to orchestrate:

   - Spark standalone Master
   - several Spark Workers/Executors

This not a request to use k8s to orchestrate Spark Jobs, but the service
cluster itself.



Thanks


Re: k8s orchestrating Spark service

2019-07-01 Thread Pat Ferrel
k8s as master would be nice but doesn’t solve the problem of running the
full cluster and is an orthogonal issue.

We’d like to deploy Spark Workers/Executors and Master (whatever master is
easiest to talk about since we really don’t care) in pods as we do with the
other services we use. Replace Spark Master with k8s if you insist. How do
the executors get deployed?

We have our own containers that almost work for 2.3.3. We have used this
before with older Spark so we are reasonably sure it makes sense. We just
wonder if our own image builds and charts are the best starting point.

Does anyone have something they like?


From: Matt Cheah  
Reply: Matt Cheah  
Date: July 1, 2019 at 4:45:55 PM
To: Pat Ferrel  ,
user@spark.apache.org  
Subject:  Re: k8s orchestrating Spark service

Sorry, I don’t quite follow – why use the Spark standalone cluster as an
in-between layer when one can just deploy the Spark application directly
inside the Helm chart? I’m curious as to what the use case is, since I’m
wondering if there’s something we can improve with respect to the native
integration with Kubernetes here. Deploying on Spark standalone mode in
Kubernetes is, to my understanding, meant to be superseded by the native
integration introduced in Spark 2.4.



*From: *Pat Ferrel 
*Date: *Monday, July 1, 2019 at 4:40 PM
*To: *"user@spark.apache.org" , Matt Cheah <
mch...@palantir.com>
*Subject: *Re: k8s orchestrating Spark service



Thanks Matt,



Actually I can’t use spark-submit. We submit the Driver programmatically
through the API. But this is not the issue and using k8s as the master is
also not the issue though you may be right about it being easier, it
doesn’t quite get to the heart.



We want to orchestrate a bunch of services including Spark. The rest work,
we are asking if anyone has seen a good starting point for adding Spark as
a k8s managed service.




From: Matt Cheah  
Reply: Matt Cheah  
Date: July 1, 2019 at 3:26:20 PM
To: Pat Ferrel  ,
user@spark.apache.org  
Subject:  Re: k8s orchestrating Spark service



I would recommend looking into Spark’s native support for running on
Kubernetes. One can just start the application against Kubernetes directly
using spark-submit in cluster mode or starting the Spark context with the
right parameters in client mode. See
https://spark.apache.org/docs/latest/running-on-kubernetes.html
[spark.apache.org]
<https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_running-2Don-2Dkubernetes.html=DwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=4XyH4cxucBNQAlSaHyR4gXJbHIo9g9vcur4VzBCYkwk=Q6mv_pZUq3UmxJU6EiJYJvG8L44WBeWJyAnw3vG0GBw=>



I would think that building Helm around this architecture of running Spark
applications would be easier than running a Spark standalone cluster. But
admittedly I’m not very familiar with the Helm technology – we just use
spark-submit.



-Matt Cheah

*From: *Pat Ferrel 
*Date: *Sunday, June 30, 2019 at 12:55 PM
*To: *"user@spark.apache.org" 
*Subject: *k8s orchestrating Spark service



We're trying to setup a system that includes Spark. The rest of the
services have good Docker containers and Helm charts to start from.



Spark on the other hand is proving difficult. We forked a container and
have tried to create our own chart but are having several problems with
this.



So back to the community… Can anyone recommend a Docker Container + Helm
Chart for use with Kubernetes to orchestrate:

   - Spark standalone Master
   - several Spark Workers/Executors

This not a request to use k8s to orchestrate Spark Jobs, but the service
cluster itself.



Thanks


Re: k8s orchestrating Spark service

2019-07-01 Thread Pat Ferrel
Thanks Matt,

Actually I can’t use spark-submit. We submit the Driver programmatically
through the API. But this is not the issue and using k8s as the master is
also not the issue though you may be right about it being easier, it
doesn’t quite get to the heart.

We want to orchestrate a bunch of services including Spark. The rest work,
we are asking if anyone has seen a good starting point for adding Spark as
a k8s managed service.


From: Matt Cheah  
Reply: Matt Cheah  
Date: July 1, 2019 at 3:26:20 PM
To: Pat Ferrel  ,
user@spark.apache.org  
Subject:  Re: k8s orchestrating Spark service

I would recommend looking into Spark’s native support for running on
Kubernetes. One can just start the application against Kubernetes directly
using spark-submit in cluster mode or starting the Spark context with the
right parameters in client mode. See
https://spark.apache.org/docs/latest/running-on-kubernetes.html



I would think that building Helm around this architecture of running Spark
applications would be easier than running a Spark standalone cluster. But
admittedly I’m not very familiar with the Helm technology – we just use
spark-submit.



-Matt Cheah

*From: *Pat Ferrel 
*Date: *Sunday, June 30, 2019 at 12:55 PM
*To: *"user@spark.apache.org" 
*Subject: *k8s orchestrating Spark service



We're trying to setup a system that includes Spark. The rest of the
services have good Docker containers and Helm charts to start from.



Spark on the other hand is proving difficult. We forked a container and
have tried to create our own chart but are having several problems with
this.



So back to the community… Can anyone recommend a Docker Container + Helm
Chart for use with Kubernetes to orchestrate:

   - Spark standalone Master
   - several Spark Workers/Executors

This not a request to use k8s to orchestrate Spark Jobs, but the service
cluster itself.



Thanks


k8s orchestrating Spark service

2019-06-30 Thread Pat Ferrel
We're trying to setup a system that includes Spark. The rest of the
services have good Docker containers and Helm charts to start from.

Spark on the other hand is proving difficult. We forked a container and
have tried to create our own chart but are having several problems with
this.

So back to the community… Can anyone recommend a Docker Container + Helm
Chart for use with Kubernetes to orchestrate:

   - Spark standalone Master
   - several Spark Workers/Executors

This not a request to use k8s to orchestrate Spark Jobs, but the service
cluster itself.

Thanks


Re: run new spark version on old spark cluster ?

2019-05-20 Thread Pat Ferrel
It is always dangerous to run a NEWER version of code on an OLDER cluster.
The danger increases with the semver change and this one is not just a
build #. In other word 2.4 is considered to be a fairly major change from
2.3. Not much else can be said.


From: Nicolas Paris  
Reply: user@spark.apache.org  
Date: May 20, 2019 at 11:02:49 AM
To: user@spark.apache.org  
Subject:  Re: run new spark version on old spark cluster ?

> you will need the spark version you intend to launch with on the machine
you
> launch from and point to the correct spark-submit

does this mean to install a second spark version (2.4) on the cluster ?

thanks

On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> yarn can happily run multiple spark versions side-by-side
> you will need the spark version you intend to launch with on the machine
you
> launch from and point to the correct spark-submit
>
> On Mon, May 20, 2019 at 1:50 PM Nicolas Paris 
wrote:
>
> Hi
>
> I am wondering whether that's feasible to:
> - build a spark application (with sbt/maven) based on spark2.4
> - deploy that jar on yarn on a spark2.3 based installation
>
> thanks by advance,
>
>
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Fwd: Spark Architecture, Drivers, & Executors

2019-05-17 Thread Pat Ferrel
In order to create an application that executes code on Spark we have a
long lived process. It periodically runs jobs programmatically on a Spark
cluster, meaning it does not use spark-submit. The Jobs it executes have
varying requirements for memory so we want to have the Spark Driver run in
the cluster.

This kind of architecture does not work very well with Spark as we
understand it. The issue is that there is no way to run in
deployMode=cluster. This setting is ignored when launching a jobs
programmatically (why is it not an exception?). This in turn means that our
launching application needs to be run on a machine that is big enough to
run the worst case Spark Driver. This is completely impractical due to our
use case (a generic always on Machine Learning Server).

What we would rather do is have the Scala closure that has access to the
Spark Context be treated as the Spark Driver and run in the cluster. There
seems to be no way to do this with off-the-shelf Spark.

This seems like a very common use case but maybe we are too close to it. We
are aware of the Job Server and Apache Livy, which seem to give us what we
need.

Are these the best solutions? Is there a way to do what we want without
spark-submit? Have others here solved this in some other way?


Re: Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Pat Ferrel
Streams have no end until watermarked or closed. Joins need bounded
datasets, et voila. Something tells me you should consider the streaming
nature of your data and whether your joins need to use increments/snippets
of infinite streams or to re-join the entire contents of the streams
accumulated at checkpoints.


From: Joe Ammann  
Reply: Joe Ammann  
Date: May 6, 2019 at 6:45:13 AM
To: user@spark.apache.org  
Subject:  Spark structured streaming watermarks on nested attributes

Hi all

I'm pretty new to Spark and implementing my first non-trivial structured
streaming job with outer joins. My environment is a Hortonworks HDP 3.1
cluster with Spark 2.3.2, working with Python.

I understood that I need to provide watermarks and join conditions for left
outer joins to work. All my incoming Kafka streams have an attribute
"LAST_MODIFICATION" which is well suited to indicate the event time, so I
chose that for watermarking. Since I'm joining from multiple topics where
the incoming messages have common attributes, I though I'd prefix/nest all
incoming messages. Something like

entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")

entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")


Now when I try to join such 2 streams, it would fail and tell me that I
need to use watermarks

When I leave the watermarking attribute "at the top level", everything
works as expected, e.g.

entity1DF.select(struct("*").alias("entity1"),
col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")


Before I hunt this down any further, is this kind of a known limitation? Or
am I doing something fundamentally wrong?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Deep Learning with Spark, what is your experience?

2019-05-04 Thread Pat Ferrel
@Riccardo

Spark does not do the DL learning part of the pipeline (afaik) so it is
limited to data ingestion and transforms (ETL). It therefore is optional
and other ETL options might be better for you.

Most of the technologies @Gourav mentions have their own scaling based on
their own compute engines specialized for their DL implementations, so be
aware that Spark scaling has nothing to do with scaling most of the DL
engines, they have their own solutions.

From: Gourav Sengupta 

Reply: Gourav Sengupta 

Date: May 4, 2019 at 10:24:29 AM
To: Riccardo Ferrari  
Cc: User  
Subject:  Re: Deep Learning with Spark, what is your experience?

Try using MxNet and Horovod directly as well (I think that MXNet is worth a
try as well):
1.
https://medium.com/apache-mxnet/distributed-training-using-apache-mxnet-with-horovod-44f98bf0e7b7
2.
https://docs.nvidia.com/deeplearning/dgx/mxnet-release-notes/rel_19-01.html
3. https://aws.amazon.com/mxnet/
4.
https://aws.amazon.com/blogs/machine-learning/aws-deep-learning-amis-now-include-horovod-for-faster-multi-gpu-tensorflow-training-on-amazon-ec2-p3-instances/


Ofcourse Tensorflow is backed by Google's advertisement team as well
https://aws.amazon.com/blogs/machine-learning/scalable-multi-node-training-with-tensorflow/


Regards,




On Sat, May 4, 2019 at 10:59 AM Riccardo Ferrari  wrote:

> Hi list,
>
> I am trying to undestand if ti make sense to leverage on Spark as enabling
> platform for Deep Learning.
>
> My open question to you are:
>
>- Do you use Apache Spark in you DL pipelines?
>- How do you use Spark for DL? Is it just a stand-alone stage in the
>workflow (ie data preparation script) or is it  more integrated
>
> I see a major advantage in leveraging on Spark as a unified entrypoint,
> for example you can easily abstract data sources and leverage on existing
> team skills for data pre-processing and training. On the flip side you may
> hit some limitations including supported versions and so on.
> What is your experience?
>
> Thanks!
>


Re: spark.submit.deployMode: cluster

2019-03-28 Thread Pat Ferrel
Thanks, are you referring to
https://github.com/spark-jobserver/spark-jobserver or the undocumented REST
job server included in Spark?


From: Jason Nerothin  
Reply: Jason Nerothin  
Date: March 28, 2019 at 2:53:05 PM
To: Pat Ferrel  
Cc: Felix Cheung 
, Marcelo
Vanzin  , user
 
Subject:  Re: spark.submit.deployMode: cluster

Check out the Spark Jobs API... it sits behind a REST service...


On Thu, Mar 28, 2019 at 12:29 Pat Ferrel  wrote:

> ;-)
>
> Great idea. Can you suggest a project?
>
> Apache PredictionIO uses spark-submit (very ugly) and Apache Mahout only
> launches trivially in test apps since most uses are as a lib.
>
>
> From: Felix Cheung  
> Reply: Felix Cheung 
> 
> Date: March 28, 2019 at 9:42:31 AM
> To: Pat Ferrel  , Marcelo
> Vanzin  
> Cc: user  
> Subject:  Re: spark.submit.deployMode: cluster
>
> If anyone wants to improve docs please create a PR.
>
> lol
>
>
> But seriously you might want to explore other projects that manage job
> submission on top of spark instead of rolling your own with spark-submit.
>
>
> --
> *From:* Pat Ferrel 
> *Sent:* Tuesday, March 26, 2019 2:38 PM
> *To:* Marcelo Vanzin
> *Cc:* user
> *Subject:* Re: spark.submit.deployMode: cluster
>
> Ahh, thank you indeed!
>
> It would have saved us a lot of time if this had been documented. I know,
> OSS so contributions are welcome… I can also imagine your next comment; “If
> anyone wants to improve docs see the Apache contribution rules and create a
> PR.” or something like that.
>
> BTW the code where the context is known and can be used is what I’d call a
> Driver and since all code is copied to nodes and is know in jars, it was
> not obvious to us that this rule existed but it does make sense.
>
> We will need to refactor our code to use spark-submit it appears.
>
> Thanks again.
>
>
> From: Marcelo Vanzin  
> Reply: Marcelo Vanzin  
> Date: March 26, 2019 at 1:59:36 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: spark.submit.deployMode: cluster
>
> If you're not using spark-submit, then that option does nothing.
>
> If by "context creation API" you mean "new SparkContext()" or an
> equivalent, then you're explicitly creating the driver inside your
> application.
>
> On Tue, Mar 26, 2019 at 1:56 PM Pat Ferrel  wrote:
> >
> > I have a server that starts a Spark job using the context creation API.
> It DOES NOY use spark-submit.
> >
> > I set spark.submit.deployMode = “cluster”
> >
> > In the GUI I see 2 workers with 2 executors. The link for running
> application “name” goes back to my server, the machine that launched the
> job.
> >
> > This is spark.submit.deployMode = “client” according to the docs. I set
> the Driver to run on the cluster but it runs on the client, ignoring the
> spark.submit.deployMode.
> >
> > Is this as expected? It is documented nowhere I can find.
> >
>
>
> --
> Marcelo
>
> --
Thanks,
Jason


Re: spark.submit.deployMode: cluster

2019-03-28 Thread Pat Ferrel
;-)

Great idea. Can you suggest a project?

Apache PredictionIO uses spark-submit (very ugly) and Apache Mahout only
launches trivially in test apps since most uses are as a lib.


From: Felix Cheung  
Reply: Felix Cheung  
Date: March 28, 2019 at 9:42:31 AM
To: Pat Ferrel  , Marcelo
Vanzin  
Cc: user  
Subject:  Re: spark.submit.deployMode: cluster

If anyone wants to improve docs please create a PR.

lol


But seriously you might want to explore other projects that manage job
submission on top of spark instead of rolling your own with spark-submit.


--
*From:* Pat Ferrel 
*Sent:* Tuesday, March 26, 2019 2:38 PM
*To:* Marcelo Vanzin
*Cc:* user
*Subject:* Re: spark.submit.deployMode: cluster

Ahh, thank you indeed!

It would have saved us a lot of time if this had been documented. I know,
OSS so contributions are welcome… I can also imagine your next comment; “If
anyone wants to improve docs see the Apache contribution rules and create a
PR.” or something like that.

BTW the code where the context is known and can be used is what I’d call a
Driver and since all code is copied to nodes and is know in jars, it was
not obvious to us that this rule existed but it does make sense.

We will need to refactor our code to use spark-submit it appears.

Thanks again.


From: Marcelo Vanzin  
Reply: Marcelo Vanzin  
Date: March 26, 2019 at 1:59:36 PM
To: Pat Ferrel  
Cc: user  
Subject:  Re: spark.submit.deployMode: cluster

If you're not using spark-submit, then that option does nothing.

If by "context creation API" you mean "new SparkContext()" or an
equivalent, then you're explicitly creating the driver inside your
application.

On Tue, Mar 26, 2019 at 1:56 PM Pat Ferrel  wrote:
>
> I have a server that starts a Spark job using the context creation API.
It DOES NOY use spark-submit.
>
> I set spark.submit.deployMode = “cluster”
>
> In the GUI I see 2 workers with 2 executors. The link for running
application “name” goes back to my server, the machine that launched the
job.
>
> This is spark.submit.deployMode = “client” according to the docs. I set
the Driver to run on the cluster but it runs on the client, ignoring the
spark.submit.deployMode.
>
> Is this as expected? It is documented nowhere I can find.
>


--
Marcelo


Re: Where does the Driver run?

2019-03-28 Thread Pat Ferrel
Thanks for the pointers. We’ll investigate.

We have been told that the “Driver” is run in the launching JVM because
deployMode = cluster is ignored if spark-submit is not used to launch.

You are saying that there is a loophole and if you use one of these client
classes there is a way to run part of the app on the cluster, and you have
seen this for Yarn?

To explain more, we create a SparkConf, and then a SparkContext, which we
pass around implicitly to functions that I would define as the Spark
Driver. It seems that if you do not use spark-submit, the entire launching
app/JVM process is considered the Driver AND is always run in client mode.

I hope your loophole pays off or we will have to do a major refactoring.


From: Jianneng Li  
Reply: Jianneng Li  
Date: March 28, 2019 at 2:03:47 AM
To: p...@occamsmachete.com  
Cc: andrew.m...@gmail.com  ,
user@spark.apache.org  ,
ak...@hacked.work  
Subject:  Re: Where does the Driver run?

Hi Pat,

The driver runs in the same JVM as SparkContext. You didn't go into detail
about how you "launch" the job (i.e. how the SparkContext is created), so
it's hard for me to guess where the driver is.

For reference, we've had success launching Spark programmatically to YARN
in cluster mode by creating a SparkConf like you did and using it to call
this class:
https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

I haven't tried this myself, but for standalone mode you might be able to
use this:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/Client.scala

Lastly, you can always check where Spark processes run by executing ps on
the machine, i.e. `ps aux | grep java`.

Best,

Jianneng



*From:* Pat Ferrel 
*Date:* Monday, March 25, 2019 at 12:58 PM
*To:* Andrew Melo 
*Cc:* user , Akhil Das 
*Subject:* Re: Where does the Driver run?



I’m beginning to agree with you and find it rather surprising that this is
mentioned nowhere explicitly (maybe I missed?). It is possible to serialize
code to be executed in executors to various nodes. It also seems possible
to serialize the “driver” bits of code although I’m not sure how the
boundary would be defined. All code is in the jars we pass to Spark so
until now I did not question the docs.



I see no mention of a distinction between running a driver in spark-submit
vs being programmatically launched for any of the Spark Master types:
Standalone, Yarn, Mesos, k8s.



We are building a Machine Learning Server in OSS. It has pluggable Engines
for different algorithms. Some of these use Spark so it is highly desirable
to offload driver code to the cluster since we don’t want the diver
embedded in the Server process. The Driver portion of our training workflow
could be very large indeed and so could force the scaling of the server to
worst case.



I hope someone knows how to run “Driver” code on the cluster when our
server is launching the code. So deployMode = cluster, deploy method =
programatic launch.




From: Andrew Melo  
Reply: Andrew Melo  
Date: March 25, 2019 at 11:40:07 AM
To: Pat Ferrel  
Cc: Akhil Das  , user
 
Subject:  Re: Where does the Driver run?



Hi Pat,



Indeed, I don't think that it's possible to use cluster mode w/o
spark-submit. All the docs I see appear to always describe needing to use
spark-submit for cluster mode -- it's not even compatible with spark-shell.
But it makes sense to me -- if you want Spark to run your application's
driver, you need to package it up and send it to the cluster manager. You
can't start spark one place and then later migrate it to the cluster. It's
also why you can't use spark-shell in cluster mode either, I think.



Cheers

Andrew



On Mon, Mar 25, 2019 at 11:22 AM Pat Ferrel  wrote:

In the GUI while the job is running the app-id link brings up logs to both
executors, The “name” link goes to 4040 of the machine that launched the
job but is not resolvable right now so the page is not shown. I’ll try the
netstat but the use of port 4040 was a good clue.



By what you say below this indicates the Driver is running on the launching
machine, the client to the Spark Cluster. This should be the case in
deployMode = client.



Can someone explain what us going on? The Evidence seems to say that
deployMode = cluster *does not work* as described unless you use
spark-submit (and I’m only guessing at that).



Further; if we don’t use spark-submit we can’t use deployMode = cluster ???




From: Akhil Das  
Reply: Akhil Das  
Date: March 24, 2019 at 7:45:07 PM
To: Pat Ferrel  
Cc: user  
Subject:  Re: Where does the Driver run?



There's also a driver ui (usually available on port 4040), after running
your code, I assume you are running it on your machine, visit
localhost:4040 and you will get the driver UI.



If you think the driver is running on your master/executor nodes, login to
those machines and do a



   netstat -napt | grep -I listen



You will see 

Re: spark.submit.deployMode: cluster

2019-03-26 Thread Pat Ferrel
Ahh, thank you indeed!

It would have saved us a lot of time if this had been documented. I know,
OSS so contributions are welcome… I can also imagine your next comment; “If
anyone wants to improve docs see the Apache contribution rules and create a
PR.” or something like that.

BTW the code where the context is known and can be used is what I’d call a
Driver and since all code is copied to nodes and is know in jars, it was
not obvious to us that this rule existed but it does make sense.

We will need to refactor our code to use spark-submit it appears.

Thanks again.


From: Marcelo Vanzin  
Reply: Marcelo Vanzin  
Date: March 26, 2019 at 1:59:36 PM
To: Pat Ferrel  
Cc: user  
Subject:  Re: spark.submit.deployMode: cluster

If you're not using spark-submit, then that option does nothing.

If by "context creation API" you mean "new SparkContext()" or an
equivalent, then you're explicitly creating the driver inside your
application.

On Tue, Mar 26, 2019 at 1:56 PM Pat Ferrel  wrote:
>
> I have a server that starts a Spark job using the context creation API.
It DOES NOY use spark-submit.
>
> I set spark.submit.deployMode = “cluster”
>
> In the GUI I see 2 workers with 2 executors. The link for running
application “name” goes back to my server, the machine that launched the
job.
>
> This is spark.submit.deployMode = “client” according to the docs. I set
the Driver to run on the cluster but it runs on the client, ignoring the
spark.submit.deployMode.
>
> Is this as expected? It is documented nowhere I can find.
>


-- 
Marcelo


spark.submit.deployMode: cluster

2019-03-26 Thread Pat Ferrel
I have a server that starts a Spark job using the context creation API. It
DOES NOY use spark-submit.

I set spark.submit.deployMode = “cluster”

In the GUI I see 2 workers with 2 executors. The link for running
application “name” goes back to my server, the machine that launched the
job.

This is spark.submit.deployMode = “client” according to the docs. I set the
Driver to run on the cluster but it runs on the client, *ignoring
the spark.submit.deployMode*.

Is this as expected? It is documented nowhere I can find.


Re: Where does the Driver run?

2019-03-25 Thread Pat Ferrel
I’m beginning to agree with you and find it rather surprising that this is
mentioned nowhere explicitly (maybe I missed?). It is possible to serialize
code to be executed in executors to various nodes. It also seems possible
to serialize the “driver” bits of code although I’m not sure how the
boundary would be defined. All code is in the jars we pass to Spark so
until now I did not question the docs.

I see no mention of a distinction between running a driver in spark-submit
vs being programmatically launched for any of the Spark Master types:
Standalone, Yarn, Mesos, k8s.

We are building a Machine Learning Server in OSS. It has pluggable Engines
for different algorithms. Some of these use Spark so it is highly desirable
to offload driver code to the cluster since we don’t want the diver
embedded in the Server process. The Driver portion of our training workflow
could be very large indeed and so could force the scaling of the server to
worst case.

I hope someone knows how to run “Driver” code on the cluster when our
server is launching the code. So deployMode = cluster, deploy method =
programatic launch.


From: Andrew Melo  
Reply: Andrew Melo  
Date: March 25, 2019 at 11:40:07 AM
To: Pat Ferrel  
Cc: Akhil Das  , user
 
Subject:  Re: Where does the Driver run?

Hi Pat,

Indeed, I don't think that it's possible to use cluster mode w/o
spark-submit. All the docs I see appear to always describe needing to use
spark-submit for cluster mode -- it's not even compatible with spark-shell.
But it makes sense to me -- if you want Spark to run your application's
driver, you need to package it up and send it to the cluster manager. You
can't start spark one place and then later migrate it to the cluster. It's
also why you can't use spark-shell in cluster mode either, I think.

Cheers
Andrew

On Mon, Mar 25, 2019 at 11:22 AM Pat Ferrel  wrote:

> In the GUI while the job is running the app-id link brings up logs to both
> executors, The “name” link goes to 4040 of the machine that launched the
> job but is not resolvable right now so the page is not shown. I’ll try the
> netstat but the use of port 4040 was a good clue.
>
> By what you say below this indicates the Driver is running on the
> launching machine, the client to the Spark Cluster. This should be the case
> in deployMode = client.
>
> Can someone explain what us going on? The Evidence seems to say that
> deployMode = cluster *does not work* as described unless you use
> spark-submit (and I’m only guessing at that).
>
> Further; if we don’t use spark-submit we can’t use deployMode = cluster ???
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 24, 2019 at 7:45:07 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> There's also a driver ui (usually available on port 4040), after running
> your code, I assume you are running it on your machine, visit
> localhost:4040 and you will get the driver UI.
>
> If you think the driver is running on your master/executor nodes, login to
> those machines and do a
>
>netstat -napt | grep -I listen
>
> You will see the driver listening on 404x there, this won't be the case
> mostly as you are not doing Spark-submit or using the deployMode=cluster.
>
> On Mon, 25 Mar 2019, 01:03 Pat Ferrel,  wrote:
>
>> Thanks, I have seen this many times in my research. Paraphrasing docs:
>> “in deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>>
>> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
>> with addresses that match slaves). When I look at memory usage while the
>> job runs I see virtually identical usage on the 2 Workers. This would
>> support your claim and contradict Spark docs for deployMode = cluster.
>>
>> The evidence seems to contradict the docs. I am now beginning to wonder
>> if the Driver only runs in the cluster if we use spark-submit
>>
>>
>>
>> From: Akhil Das  
>> Reply: Akhil Das  
>> Date: March 23, 2019 at 9:26:50 PM
>> To: Pat Ferrel  
>> Cc: user  
>> Subject:  Re: Where does the Driver run?
>>
>> If you are starting your "my-app" on your local machine, that's where the
>> driver is running.
>>
>> [image: image.png]
>>
>> Hope this helps.
>> <https://spark.apache.org/docs/latest/cluster-overview.html>
>>
>> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>>
>>> I have researched this for a significant amount of time and find answers
>>> that seem to be for a slightly different question than mine.
>>>
>>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>>> http://master-address:8080;, there are 2 idle workers, as configured.
>>>
>>> I have a S

Re: Where does the Driver run?

2019-03-25 Thread Pat Ferrel
In the GUI while the job is running the app-id link brings up logs to both
executors, The “name” link goes to 4040 of the machine that launched the
job but is not resolvable right now so the page is not shown. I’ll try the
netstat but the use of port 4040 was a good clue.

By what you say below this indicates the Driver is running on the launching
machine, the client to the Spark Cluster. This should be the case in
deployMode = client.

Can someone explain what us going on? The Evidence seems to say that
deployMode = cluster *does not work *as described unless you use
spark-submit (and I’m only guessing at that).

Further; if we don’t use spark-submit we can’t use deployMode = cluster ???


From: Akhil Das  
Reply: Akhil Das  
Date: March 24, 2019 at 7:45:07 PM
To: Pat Ferrel  
Cc: user  
Subject:  Re: Where does the Driver run?

There's also a driver ui (usually available on port 4040), after running
your code, I assume you are running it on your machine, visit
localhost:4040 and you will get the driver UI.

If you think the driver is running on your master/executor nodes, login to
those machines and do a

   netstat -napt | grep -I listen

You will see the driver listening on 404x there, this won't be the case
mostly as you are not doing Spark-submit or using the deployMode=cluster.

On Mon, 25 Mar 2019, 01:03 Pat Ferrel,  wrote:

> Thanks, I have seen this many times in my research. Paraphrasing docs: “in
> deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>
> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
> with addresses that match slaves). When I look at memory usage while the
> job runs I see virtually identical usage on the 2 Workers. This would
> support your claim and contradict Spark docs for deployMode = cluster.
>
> The evidence seems to contradict the docs. I am now beginning to wonder if
> the Driver only runs in the cluster if we use spark-submit
>
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 23, 2019 at 9:26:50 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> If you are starting your "my-app" on your local machine, that's where the
> driver is running.
>
> [image: image.png]
>
> Hope this helps.
> <https://spark.apache.org/docs/latest/cluster-overview.html>
>
> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>
>> I have researched this for a significant amount of time and find answers
>> that seem to be for a slightly different question than mine.
>>
>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>> http://master-address:8080;, there are 2 idle workers, as configured.
>>
>> I have a Scala application that creates a context and starts execution of
>> a Job. I *do not use spark-submit*, I start the Job programmatically and
>> this is where many explanations forks from my question.
>>
>> In "my-app" I create a new SparkConf, with the following code (slightly
>> abbreviated):
>>
>>   conf.setAppName(“my-job")
>>   conf.setMaster(“spark://master-address:7077”)
>>   conf.set(“deployMode”, “cluster”)
>>   // other settings like driver and executor memory requests
>>   // the driver and executor memory requests are for all mem on the
>> slaves, more than
>>   // mem available on the launching machine with “my-app"
>>   val jars = listJars(“/path/to/lib")
>>   conf.setJars(jars)
>>   …
>>
>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>> Everything seems to run fine and sometimes completes successfully. Frequent
>> failures are the reason for this question.
>>
>> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
>> taking all cluster resources. With a Yarn cluster I would expect the
>> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
>> Master, where is the Drive part of the Job running?
>>
>> If is is running in the Master, we are in trouble because I start the
>> Master on one of my 2 Workers sharing resources with one of the Executors.
>> Executor mem + driver mem is > available mem on a Worker. I can change this
>> but need so understand where the Driver part of the Spark Job runs. Is it
>> in the Spark Master, or inside and Executor, or ???
>>
>> The “Driver” creates and broadcasts some large data structures so the
>> need for an answer is more critical than with more typical tiny Drivers.
>>
>> Thanks for you help!
>>
>
>
> --
> Cheers!
>
>


Re: Where does the Driver run?

2019-03-24 Thread Pat Ferrel
2 Slaves, one of which is also Master.

Node 1 & 2 are slaves. Node 1 is where I run start-all.sh.

The machines both have 60g of free memory (leaving about 4g for the master
process on Node 1). The only constraint to the Driver and Executors is
spark.driver.memory = spark.executor.memory = 60g

BTW I would expect this to create one Executor, one Driver, and the Master
on 2 Workers.




From: Andrew Melo  
Reply: Andrew Melo  
Date: March 24, 2019 at 12:46:35 PM
To: Pat Ferrel  
Cc: Akhil Das  , user
 
Subject:  Re: Where does the Driver run?

Hi Pat,

On Sun, Mar 24, 2019 at 1:03 PM Pat Ferrel  wrote:

> Thanks, I have seen this many times in my research. Paraphrasing docs: “in
> deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>
> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
> with addresses that match slaves). When I look at memory usage while the
> job runs I see virtually identical usage on the 2 Workers. This would
> support your claim and contradict Spark docs for deployMode = cluster.
>
> The evidence seems to contradict the docs. I am now beginning to wonder if
> the Driver only runs in the cluster if we use spark-submit
>

Where/how are you starting "./sbin/start-master.sh"?

Cheers
Andrew


>
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 23, 2019 at 9:26:50 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> If you are starting your "my-app" on your local machine, that's where the
> driver is running.
>
> [image: image.png]
>
> Hope this helps.
> <https://spark.apache.org/docs/latest/cluster-overview.html>
>
> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>
>> I have researched this for a significant amount of time and find answers
>> that seem to be for a slightly different question than mine.
>>
>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>> http://master-address:8080;, there are 2 idle workers, as configured.
>>
>> I have a Scala application that creates a context and starts execution of
>> a Job. I *do not use spark-submit*, I start the Job programmatically and
>> this is where many explanations forks from my question.
>>
>> In "my-app" I create a new SparkConf, with the following code (slightly
>> abbreviated):
>>
>>   conf.setAppName(“my-job")
>>   conf.setMaster(“spark://master-address:7077”)
>>   conf.set(“deployMode”, “cluster”)
>>   // other settings like driver and executor memory requests
>>   // the driver and executor memory requests are for all mem on the
>> slaves, more than
>>   // mem available on the launching machine with “my-app"
>>   val jars = listJars(“/path/to/lib")
>>   conf.setJars(jars)
>>   …
>>
>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>> Everything seems to run fine and sometimes completes successfully. Frequent
>> failures are the reason for this question.
>>
>> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
>> taking all cluster resources. With a Yarn cluster I would expect the
>> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
>> Master, where is the Drive part of the Job running?
>>
>> If is is running in the Master, we are in trouble because I start the
>> Master on one of my 2 Workers sharing resources with one of the Executors.
>> Executor mem + driver mem is > available mem on a Worker. I can change this
>> but need so understand where the Driver part of the Spark Job runs. Is it
>> in the Spark Master, or inside and Executor, or ???
>>
>> The “Driver” creates and broadcasts some large data structures so the
>> need for an answer is more critical than with more typical tiny Drivers.
>>
>> Thanks for you help!
>>
>
>
> --
> Cheers!
>
>


CCEACC67-4431-4246-AEB8-60CEC0940BA9
Description: Binary data


Re: Where does the Driver run?

2019-03-24 Thread Pat Ferrel
2 Slaves, one of which is also Master.

Node 1 & 2 are slaves. Node 1 is where I run start-all.sh.

The machines both have 60g of free memory (leaving about 4g for the master
process on Node 1). The only constraint to the Driver and Executors is
spark.driver.memory = spark.executor.memory = 60g


From: Andrew Melo  
Reply: Andrew Melo  
Date: March 24, 2019 at 12:46:35 PM
To: Pat Ferrel  
Cc: Akhil Das  , user
 
Subject:  Re: Where does the Driver run?

Hi Pat,

On Sun, Mar 24, 2019 at 1:03 PM Pat Ferrel  wrote:

> Thanks, I have seen this many times in my research. Paraphrasing docs: “in
> deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>
> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
> with addresses that match slaves). When I look at memory usage while the
> job runs I see virtually identical usage on the 2 Workers. This would
> support your claim and contradict Spark docs for deployMode = cluster.
>
> The evidence seems to contradict the docs. I am now beginning to wonder if
> the Driver only runs in the cluster if we use spark-submit
>

Where/how are you starting "./sbin/start-master.sh"?

Cheers
Andrew


>
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 23, 2019 at 9:26:50 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> If you are starting your "my-app" on your local machine, that's where the
> driver is running.
>
> [image: image.png]
>
> Hope this helps.
> <https://spark.apache.org/docs/latest/cluster-overview.html>
>
> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>
>> I have researched this for a significant amount of time and find answers
>> that seem to be for a slightly different question than mine.
>>
>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>> http://master-address:8080;, there are 2 idle workers, as configured.
>>
>> I have a Scala application that creates a context and starts execution of
>> a Job. I *do not use spark-submit*, I start the Job programmatically and
>> this is where many explanations forks from my question.
>>
>> In "my-app" I create a new SparkConf, with the following code (slightly
>> abbreviated):
>>
>>   conf.setAppName(“my-job")
>>   conf.setMaster(“spark://master-address:7077”)
>>   conf.set(“deployMode”, “cluster”)
>>   // other settings like driver and executor memory requests
>>   // the driver and executor memory requests are for all mem on the
>> slaves, more than
>>   // mem available on the launching machine with “my-app"
>>   val jars = listJars(“/path/to/lib")
>>   conf.setJars(jars)
>>   …
>>
>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>> Everything seems to run fine and sometimes completes successfully. Frequent
>> failures are the reason for this question.
>>
>> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
>> taking all cluster resources. With a Yarn cluster I would expect the
>> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
>> Master, where is the Drive part of the Job running?
>>
>> If is is running in the Master, we are in trouble because I start the
>> Master on one of my 2 Workers sharing resources with one of the Executors.
>> Executor mem + driver mem is > available mem on a Worker. I can change this
>> but need so understand where the Driver part of the Spark Job runs. Is it
>> in the Spark Master, or inside and Executor, or ???
>>
>> The “Driver” creates and broadcasts some large data structures so the
>> need for an answer is more critical than with more typical tiny Drivers.
>>
>> Thanks for you help!
>>
>
>
> --
> Cheers!
>
>


3847fb65eedb5792_0.1.1
Description: Binary data


Re: Where does the Driver run?

2019-03-24 Thread Pat Ferrel
Thanks, I have seen this many times in my research. Paraphrasing docs: “in
deployMode ‘cluster' the Driver runs on a Worker in the cluster”

When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
with addresses that match slaves). When I look at memory usage while the
job runs I see virtually identical usage on the 2 Workers. This would
support your claim and contradict Spark docs for deployMode = cluster.

The evidence seems to contradict the docs. I am now beginning to wonder if
the Driver only runs in the cluster if we use spark-submit



From: Akhil Das  
Reply: Akhil Das  
Date: March 23, 2019 at 9:26:50 PM
To: Pat Ferrel  
Cc: user  
Subject:  Re: Where does the Driver run?

If you are starting your "my-app" on your local machine, that's where the
driver is running.

[image: image.png]

Hope this helps.
<https://spark.apache.org/docs/latest/cluster-overview.html>

On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:

> I have researched this for a significant amount of time and find answers
> that seem to be for a slightly different question than mine.
>
> The Spark 2.3.3 cluster is running fine. I see the GUI on “
> http://master-address:8080;, there are 2 idle workers, as configured.
>
> I have a Scala application that creates a context and starts execution of
> a Job. I *do not use spark-submit*, I start the Job programmatically and
> this is where many explanations forks from my question.
>
> In "my-app" I create a new SparkConf, with the following code (slightly
> abbreviated):
>
>   conf.setAppName(“my-job")
>   conf.setMaster(“spark://master-address:7077”)
>   conf.set(“deployMode”, “cluster”)
>   // other settings like driver and executor memory requests
>   // the driver and executor memory requests are for all mem on the
> slaves, more than
>   // mem available on the launching machine with “my-app"
>   val jars = listJars(“/path/to/lib")
>   conf.setJars(jars)
>   …
>
> When I launch the job I see 2 executors running on the 2 workers/slaves.
> Everything seems to run fine and sometimes completes successfully. Frequent
> failures are the reason for this question.
>
> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
> taking all cluster resources. With a Yarn cluster I would expect the
> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
> Master, where is the Drive part of the Job running?
>
> If is is running in the Master, we are in trouble because I start the
> Master on one of my 2 Workers sharing resources with one of the Executors.
> Executor mem + driver mem is > available mem on a Worker. I can change this
> but need so understand where the Driver part of the Spark Job runs. Is it
> in the Spark Master, or inside and Executor, or ???
>
> The “Driver” creates and broadcasts some large data structures so the need
> for an answer is more critical than with more typical tiny Drivers.
>
> Thanks for you help!
>


--
Cheers!


ii_jtmf6k1q0.png
Description: Binary data


Where does the Driver run?

2019-03-23 Thread Pat Ferrel
I have researched this for a significant amount of time and find answers
that seem to be for a slightly different question than mine.

The Spark 2.3.3 cluster is running fine. I see the GUI on “
http://master-address:8080;, there are 2 idle workers, as configured.

I have a Scala application that creates a context and starts execution of a
Job. I *do not use spark-submit*, I start the Job programmatically and this
is where many explanations forks from my question.

In "my-app" I create a new SparkConf, with the following code (slightly
abbreviated):

  conf.setAppName(“my-job")
  conf.setMaster(“spark://master-address:7077”)
  conf.set(“deployMode”, “cluster”)
  // other settings like driver and executor memory requests
  // the driver and executor memory requests are for all mem on the
slaves, more than
  // mem available on the launching machine with “my-app"
  val jars = listJars(“/path/to/lib")
  conf.setJars(jars)
  …

When I launch the job I see 2 executors running on the 2 workers/slaves.
Everything seems to run fine and sometimes completes successfully. Frequent
failures are the reason for this question.

Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
taking all cluster resources. With a Yarn cluster I would expect the
“Driver" to run on/in the Yarn Master but I am using the Spark Standalone
Master, where is the Drive part of the Job running?

If is is running in the Master, we are in trouble because I start the
Master on one of my 2 Workers sharing resources with one of the Executors.
Executor mem + driver mem is > available mem on a Worker. I can change this
but need so understand where the Driver part of the Spark Job runs. Is it
in the Spark Master, or inside and Executor, or ???

The “Driver” creates and broadcasts some large data structures so the need
for an answer is more critical than with more typical tiny Drivers.

Thanks for you help!


Re: Spark with Kubernetes connecting to pod ID, not address

2019-02-13 Thread Pat Ferrel
solve(SimpleNameResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at 
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at 
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:978)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:512)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:423)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:482)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
... 1 more



From: Erik Erlandson 
Date: February 13, 2019 at 4:57:30 AM
To: Pat Ferrel 
Subject:  Re: Spark with Kubernetes connecting to pod id, not address  

Hi Pat,

I'd suggest visiting the big data slack channel, it's a more spark oriented 
forum than kube-dev:
https://kubernetes.slack.com/messages/C0ELB338T/

Tentatively, I think you may want to submit in client mode (unless you are 
initiating your application from outside the kube cluster). When in client 
mode, you need to set up a headless service for the application driver pod that 
the executors can use to talk back to the driver.
https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode

Cheers,
Erik


On Wed, Feb 13, 2019 at 1:55 AM Pat Ferrel  wrote:
We have a k8s deployment of several services including Apache Spark. All 
services seem to be operational. Our application connects to the Spark master 
to submit a job using the k8s DNS service for the cluster where the master is 
called spark-api so we use master=spark://spark-api:7077 and we use 
spark.submit.deployMode=cluster. We submit the job through the API not by the 
spark-submit script. 

This will run the "driver" and all "executors" on the cluster and this part 
seems to work but there is a callback to the launching code in our app from 
some Spark process. For some reason it is trying to connect to 
harness-64d97d6d6-4r4d8, which is the pod ID, not the k8s cluster IP or DNS.

How could this pod ID be getting into the system? Spark somehow seems to think 
it is the address of the service that called it. Needless to say any connection 
to the k8s pod ID fails and so does the job.

Any idea how Spark could think the pod ID is an IP address or DNS name? 

BTW if we run a small sample job with `master=local` all is well, but the same 
job executed with the above config tries to connect to the spurious pod ID.
--
You received this message because you are subscribed to the Google Groups 
"Kubernetes developer/contributor discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to kubernetes-dev+unsubscr...@googlegroups.com.
To post to this group, send email to kubernetes-...@googlegroups.com.
Visit this group at https://groups.google.com/group/kubernetes-dev.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/kubernetes-dev/36bb6bf8-1cac-428e-8ad7-3d639c90a86b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Spark with Kubernetes connecting to pod id, not address

2019-02-12 Thread Pat Ferrel


From: Pat Ferrel 
Reply: Pat Ferrel 
Date: February 12, 2019 at 5:40:41 PM
To: user@spark.apache.org 
Subject:  Spark with Kubernetes connecting to pod id, not address  

We have a k8s deployment of several services including Apache Spark. All 
services seem to be operational. Our application connects to the Spark master 
to submit a job using the k8s DNS service for the cluster where the master is 
called `spark-api` so we use `master=spark://spark-api:7077` and we use 
`spark.submit.deployMode=cluster`. We submit the job through the API not by the 
spark-submit script. 

This will run the "driver" and all "executors" on the cluster and this part 
seems to work but there is a callback to the launching code in our app from 
some Spark process. For some reason it is trying to connect to 
`harness-64d97d6d6-4r4d8`, which is the **pod ID**, not the k8s cluster IP or 
DNS.

How could this **pod ID** be getting into the system? Spark somehow seems to 
think it is the address of the service that called it. Needless to say any 
connection to the k8s pod ID fails and so does the job.

Any idea how Spark could think the **pod ID** is an IP address or DNS name? 

BTW if we run a small sample job with `master=local` all is well, but the same 
job executed with the above config tries to connect to the spurious pod ID.

BTW2 the pod launching the Spark job has the k8s DNS name "harness-api” not 
sure if this matters

Thanks in advance


Give a task more resources

2017-01-11 Thread Pat Ferrel
I have a task that would benefit from more cores but the standalone scheduler 
launches it when only a subset are available. I’d rather use all cluster cores 
on this task.

Is there a way to tell the scheduler to finish everything before allocating 
resources to a task? Like "finish everything else then launch this”?

Put another way the DAG would be better for this job if it ended all paths 
before executing a task or waited until more cores were available. Perhaps a 
way to hint that a task is fat.

Any ideas?
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Memory allocation for Broadcast values

2015-12-20 Thread Pat Ferrel
I have a large Map that is assembled in the driver and broadcast to each node.

My question is how best to allocate memory for this.  The Driver has to have 
enough memory for the Maps, but only one copy is serialized to each node. What 
type of memory should I size to match the Maps? Is the broadcast Map taking a 
little from each executor, all from every executor, or is there something other 
than driver and executor memory I can size? 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD.isEmpty

2015-12-09 Thread Pat Ferrel
I’m getting *huge* execution times on a moderate sized dataset during the 
RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty 
calculation. I’m using Spark 1.5.1 and from researching I would expect this 
calculation to be linearly proportional to the number of partitions as a worst 
case, which should be a trivial amount of time but it is taking many minutes to 
hours to complete this single phase.

I know that has been a small amount of discussion about using this so would 
love to hear what the current thinking on the subject is. Is there a better way 
to find if an RDD has data? Can someone explain why this is happening?

reference PR
https://github.com/apache/spark/pull/4534 


Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
Err, compiled for Spark 1.3.1, running on 1.5.1 if that makes any difference. 
The Spark impl is “provided” so should be using 1.5.1 code afaik.

The code is as you see below for isEmpty, so not sure what else could it could 
be measuring since it’s the only spark thing on the line. I can regen the 
timeline but here is the .take(1) timeline. It is an order of magnitude faster 
(from my recollection) but even the take(1) still seems incredibly slow for an 
empty test. I was surprised that isEmpty is a distributed calc. When run from 
the driver this value could have already been calculated as a byproduct of 
creating the RDD, no?

I could use an accumulator to count members as the RDD is created and get a 
negligible .isEmpty calc time, right? The RDD creation might be slightly slower 
due to using an accumulator.





On Dec 9, 2015, at 9:29 AM, Sean Owen <so...@cloudera.com> wrote:

Are you sure it's isEmpty? and not an upstream stage? isEmpty is
definitely the action here.  It doesn't make sense that take(1) is so
much faster since it's the "same thing".

On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel <p...@occamsmachete.com> wrote:
> Sure, I thought this might be a known issue.
> 
> I have a 122M dataset, which is the trust and rating data from epinions. The 
> data is split into two RDDs and there is an item properties RDD. The code is 
> just trying to remove any empty RDD from the list.
> 
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>  (correlators ::: properties).filterNot( c => c.isEmpty())
> 
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
> hundred minutes (going from memory, I can supply the timeline given a few 
> hours to recalc it).
> 
> Running a different version of the code that does a .count for debug and 
> .take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
> and the .take(1) uses 3 minutes.
> 
> Other users have seen total runtime on 13G dataset of 700 minutes with the 
> execution time mostly spent in isEmpty.
> 
> 
> On Dec 9, 2015, at 8:50 AM, Sean Owen <so...@cloudera.com> wrote:
> 
> It should at best collect 1 item to the driver. This means evaluating
> at least 1 element of 1 partition. I can imagine pathological cases
> where that's slow, but, do you have any more info? how slow is slow
> and what is slow?
> 
> On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel <p...@occamsmachete.com> wrote:
>> I’m getting *huge* execution times on a moderate sized dataset during the
>> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
>> calculation. I’m using Spark 1.5.1 and from researching I would expect this
>> calculation to be linearly proportional to the number of partitions as a
>> worst case, which should be a trivial amount of time but it is taking many
>> minutes to hours to complete this single phase.
>> 
>> I know that has been a small amount of discussion about using this so would
>> love to hear what the current thinking on the subject is. Is there a better
>> way to find if an RDD has data? Can someone explain why this is happening?
>> 
>> reference PR
>> https://github.com/apache/spark/pull/4534
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
I ran a 124M dataset on my laptop
with isEmpty it took 32 minutes
without isEmpty it took 18 minutes all but 1.5 minutes were in writing to 
Elasticsearch, which is on the same laptop

So excluding the time writing to Elasticsearch, which was nearly the same in 
both cases, the core Spark code took 10x longer with isEmpty. There are other 
isEmpty calls that I’ll optimize away but they are much smaller gains. Also 
strike the comparison to take(1), pretend I never said that.

I can avoid isEmpty but still a bit of a head scratcher.


> On Dec 9, 2015, at 11:53 AM, Sean Owen <so...@cloudera.com> wrote:
> 
> On Wed, Dec 9, 2015 at 7:49 PM, Pat Ferrel <p...@occamsmachete.com> wrote:
>> The “Any” is required by the code it is being passed to, which is the
>> Elasticsearch Spark index writing code. The values are actually RDD[(String,
>> Map[String, String])]
> 
> (Is it frequently a big big map by any chance?)

No, 5000 chars or so per Map.

> 
>> No shuffle that I know of. RDDs are created from the output of Mahout
>> SimilarityAnalysis.cooccurrence and are turned into RDD[(String, Map[String,
>> String])], Since all actual values are simple there is no serialization
>> except for standard Java/Scala so no custom serializers or use of Kryo.
> 
> It's still worth looking at the stages in the job.
> 
> 
>> I understand that the driver can’t know, I was suggesting that isEmpty could
>> be backed by a boolean RDD member variable calculated for every RDD at
>> creation time in Spark. This is really the only way to solve generally since
>> sometimes you get an RDD from a lib, so wrapping it as I suggested is not
>> practical, it would have to be in Spark. BTW the same approach could be used
>> for count, an accumulator per RDD, then returned as a pre-calculated RDD
>> state value.
> 
> What would the boolean do? you can't cache the size in general even if
> you know it, but you don't know it at the time the RDD is created
> (before it's evaluated).

Sorry, maybe I misunderstand but either the accumulator being referenced causes 
the DAG to be executed up to the right spot or you have to checkpoint, either 
way we get the count from a fully executed closure.

> 
>> Are you suggesting that both take(1) and isEmpty are unusable for some
>> reason in my case? I can pass around this information if I have to, I just
>> thought the worst case was O(n) where n was number of partitions and
>> therefor always trivial to calculate.
> 
> No, just trying to guess at reasons you observe what you do. There's
> no difference between isEmpty and take(1) if there are > 0 partitions,
> so if they behave very differently it's something to do with what
> you're measuring and how.
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
The “Any” is required by the code it is being passed to, which is the 
Elasticsearch Spark index writing code. The values are actually RDD[(String, 
Map[String, String])]

No shuffle that I know of. RDDs are created from the output of Mahout 
SimilarityAnalysis.cooccurrence and are turned into RDD[(String, Map[String, 
String])], Since all actual values are simple there is no serialization except 
for standard Java/Scala so no custom serializers or use of Kryo.

I understand that the driver can’t know, I was suggesting that isEmpty could be 
backed by a boolean RDD member variable calculated for every RDD at creation 
time in Spark. This is really the only way to solve generally since sometimes 
you get an RDD from a lib, so wrapping it as I suggested is not practical, it 
would have to be in Spark. BTW the same approach could be used for count, an 
accumulator per RDD, then returned as a pre-calculated RDD state value.

Are you suggesting that both take(1) and isEmpty are unusable for some reason 
in my case? I can pass around this information if I have to, I just thought the 
worst case was O(n) where n was number of partitions and therefor always 
trivial to calculate.

This would be 0 time on the timeline if I explicitly keep track os RDD size 
with accumulators (just slightly slower .map), so is this my best path?


On Dec 9, 2015, at 10:06 AM, Sean Owen <so...@cloudera.com> wrote:

Yes but what is the code that generates the RDD? is it a shuffle of something? 
that could cause checking for any element to be expensive since computing the 
RDD at all is expensive. Look at the stages in these long-running jobs.

How could isEmpty not be distributed? the driver can't know whether the RDD's 
partitions are empty without evaluating at least one of them a little bit 
(unless there are 0 partitions). Caching the size doesn't help unless, well, 
you know the size already because the RDD was fully computed. And it might get 
weird anyway since RDDs are only as deterministic as their source -- counting 
lines of a text file will return a different number if the text file is 
appended to.

The only thing that sticks out is the time to serialize one value back to the 
driver. I don't know what your "Any" is there but could it be big or hard to 
serialize?

Really there's a little gotcha in this implementation: you can only check 
isEmpty on an RDD of serializable objects! which is a pretty good assumption; 
you won't get far with an RDD of something unserializable but it's not 
impossible for it to come up.

The serialization could be avoided by mapping everything to "1" or something 
and take-ing *that*. Returning a 1 to the driver is trivial. Or maybe adapt 
some version of the implementation of take() to be an optimized, smarter 
isEmpty(). Neither seemed worth the overhead at the time, but this could be a 
case against that, if it turns out somehow to be serialization time.


On Wed, Dec 9, 2015 at 5:55 PM, Pat Ferrel <p...@occamsmachete.com 
<mailto:p...@occamsmachete.com>> wrote:
Err, compiled for Spark 1.3.1, running on 1.5.1 if that makes any difference. 
The Spark impl is “provided” so should be using 1.5.1 code afaik.

The code is as you see below for isEmpty, so not sure what else could it could 
be measuring since it’s the only spark thing on the line. I can regen the 
timeline but here is the .take(1) timeline. It is an order of magnitude faster 
(from my recollection) but even the take(1) still seems incredibly slow for an 
empty test. I was surprised that isEmpty is a distributed calc. When run from 
the driver this value could have already been calculated as a byproduct of 
creating the RDD, no?

I could use an accumulator to count members as the RDD is created and get a 
negligible .isEmpty calc time, right? The RDD creation might be slightly slower 
due to using an accumulator.






On Dec 9, 2015, at 9:29 AM, Sean Owen <so...@cloudera.com 
<mailto:so...@cloudera.com>> wrote:

Are you sure it's isEmpty? and not an upstream stage? isEmpty is
definitely the action here.  It doesn't make sense that take(1) is so
much faster since it's the "same thing".

On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel <p...@occamsmachete.com 
<mailto:p...@occamsmachete.com>> wrote:
> Sure, I thought this might be a known issue.
> 
> I have a 122M dataset, which is the trust and rating data from epinions. The 
> data is split into two RDDs and there is an item properties RDD. The code is 
> just trying to remove any empty RDD from the list.
> 
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>  (correlators ::: properties).filterNot( c => c.isEmpty())
> 
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
> hundred minutes (going from memory, I can supply the timeline given a few 
> hours to recalc it).
> 
> Running a different version of the code that does a .count for 

Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
Sure, I thought this might be a known issue.

I have a 122M dataset, which is the trust and rating data from epinions. The 
data is split into two RDDs and there is an item properties RDD. The code is 
just trying to remove any empty RDD from the list.

val esRDDs: List[RDD[(String, Map[String, Any])]] =
  (correlators ::: properties).filterNot( c => c.isEmpty())

On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
hundred minutes (going from memory, I can supply the timeline given a few hours 
to recalc it). 

Running a different version of the code that does a .count for debug and 
.take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
and the .take(1) uses 3 minutes.

Other users have seen total runtime on 13G dataset of 700 minutes with the 
execution time mostly spent in isEmpty.


On Dec 9, 2015, at 8:50 AM, Sean Owen <so...@cloudera.com> wrote:

It should at best collect 1 item to the driver. This means evaluating
at least 1 element of 1 partition. I can imagine pathological cases
where that's slow, but, do you have any more info? how slow is slow
and what is slow?

On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel <p...@occamsmachete.com> wrote:
> I’m getting *huge* execution times on a moderate sized dataset during the
> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
> calculation. I’m using Spark 1.5.1 and from researching I would expect this
> calculation to be linearly proportional to the number of partitions as a
> worst case, which should be a trivial amount of time but it is taking many
> minutes to hours to complete this single phase.
> 
> I know that has been a small amount of discussion about using this so would
> love to hear what the current thinking on the subject is. Is there a better
> way to find if an RDD has data? Can someone explain why this is happening?
> 
> reference PR
> https://github.com/apache/spark/pull/4534

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



rdd.saveAsSequenceFile(path)

2015-06-27 Thread Pat Ferrel
Our project is having a hard time following what we are supposed to do to 
migrate this function from Spark 1.2 to 1.3.

  /**
   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
   * @param path
   */
  def dfsWrite(path: String) = {
val ktag = implicitly[ClassTag[K]]
//val vtag = implicitly[ClassTag[Vector]]

implicit val k2wFunc: (K) = Writable =
  if (ktag.runtimeClass == classOf[Int]) (x: K) = new 
IntWritable(x.asInstanceOf[Int])
  else if (ktag.runtimeClass == classOf[String]) (x: K) = new 
Text(x.asInstanceOf[String])
  else if (ktag.runtimeClass == classOf[Long]) (x: K) = new 
LongWritable(x.asInstanceOf[Long])
  else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) = 
x.asInstanceOf[Writable]
  else throw new IllegalArgumentException(Do not know how to convert class 
tag %s to Writable..format(ktag))

// the problem is here =
// this worked in Spark 1.2 and as we understand things should in 1.3 if we 
have the right implicits
//  rdd.saveAsSequenceFile(path)

// this works in Spark 1.3 but uses a deprecated method
SparkContext.rddToSequenceFileRDDFunctions(rdd.asInstanceOf[RDD[(K, 
Vector)]]).saveAsSequenceFile(path)
  }

As we understand it, we need to supply implicit writeable factories now instead 
of writables? The rdd is a sequence of key = one of the classes above, value = 
a Mahout “Vector. These are usually serialized through Kryo (not 
JavaSerializer) for closures so we have compatible classes for that. 

Any pointers would be helpful.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the 
driver. 

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Sure

var columns = mc.textFile(source).map { line = line.split(delimiter) }

Here “source” is a comma delimited list of files or directories. Both the 
textFile and .map tasks happen only on the machine they were launched from.

Later other distributed operations happen but I suspect if I can figure out why 
the fist line is run only on the client machine the rest will clear up too. 
Here are some subsequent lines.

if(filterColumn != -1) {
  columns = columns.filter { tokens = tokens(filterColumn) == filterBy }
}

val interactions = columns.map { tokens =
  tokens(rowIDColumn) - tokens(columnIDPosition)
}

interactions.cache()

On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote:

Will you be able to paste code here?

On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the driver.

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org







Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Argh, I looked and there really isn’t that much data yet. There will be 
thousands but starting small.

I bet this is just a total data size not requiring all workers thing—sorry, 
nevermind.


On Apr 23, 2015, at 10:30 AM, Pat Ferrel p...@occamsmachete.com wrote:

They are in HDFS so available on all workers

On Apr 23, 2015, at 10:29 AM, Pat Ferrel p...@occamsmachete.com wrote:

Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure
 
  var columns = mc.textFile(source).map { line = line.split(delimiter) }
 
 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.
 
 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.
 
  if(filterColumn != -1) {
columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
  }
 
  val interactions = columns.map { tokens =
tokens(rowIDColumn) - tokens(columnIDPosition)
  }
 
  interactions.cache()
 
 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
 Will you be able to paste code here?
 
 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:
 
 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)
 
 What would cause the read to occur only on the machine that launched the
 driver.
 
 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure
 
var columns = mc.textFile(source).map { line = line.split(delimiter) }
 
 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.
 
 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.
 
if(filterColumn != -1) {
  columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
}
 
val interactions = columns.map { tokens =
  tokens(rowIDColumn) - tokens(columnIDPosition)
}
 
interactions.cache()
 
 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
 Will you be able to paste code here?
 
 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:
 
 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)
 
 What would cause the read to occur only on the machine that launched the
 driver.
 
 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
They are in HDFS so available on all workers

On Apr 23, 2015, at 10:29 AM, Pat Ferrel p...@occamsmachete.com wrote:

Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure
 
   var columns = mc.textFile(source).map { line = line.split(delimiter) }
 
 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.
 
 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.
 
   if(filterColumn != -1) {
 columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
   }
 
   val interactions = columns.map { tokens =
 tokens(rowIDColumn) - tokens(columnIDPosition)
   }
 
   interactions.cache()
 
 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
 Will you be able to paste code here?
 
 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:
 
 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)
 
 What would cause the read to occur only on the machine that launched the
 driver.
 
 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



TaskResultLost

2015-04-14 Thread Pat Ferrel
Running on Spark 1.1.1 Hadoop 2.4 with Yarn AWS dedicated cluster (non-EMR)

Is this in our code or config? I’ve never run into a TaskResultLost, not sure 
what can cause that.


TaskResultLost (result lost from block manager)

nivea.m https://gd-a.slack.com/team/nivea.m[11:01 AM]
collect at AtA.scala:12197/213 (25 failed)

nivea.m https://gd-a.slack.com/team/nivea.m[11:01 AM]
org.apache.spark.rdd.RDD.collect(RDD.scala:774)
org.apache.mahout.sparkbindings.blas.AtA$.at_a_slim(AtA.scala:121)
org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:50)
org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:231)
org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:242)
org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:108)
org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:40)
org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:90)
org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:129)
org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:127)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableOnce$class.to 
http://class.to/(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to 
http://scala.collection.abstractiterator.to/(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
scala.collection.AbstractIterator.toList(Iterator.scala:1157)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need Advice about reading lots of text files

2015-03-17 Thread Pat Ferrel
There are no-doubt many things that feed into the right way to read a lot of 
files into Spark. But why force users to learn all of those factors instead of 
putting an optimizer layer into the read inside Spark?

BTW I realize your method is not one task per file, it’s chunked and done in 
parallel. Looks good for text and I may use it—but what about sequence files or 
json SchemaRDD/DataFrame reading? These will all have the same issue and are 
also likely to be in very many small files given the increasing popularity of 
Spark Streaming. It also seems like an optimizer would work in a very similar 
way for these.

+1 for read optimizer :-)


On Mar 17, 2015, at 10:31 AM, Michael Armbrust mich...@databricks.com wrote:

I agree that it would be better if Spark did a better job automatically here, 
though doing so is probably a non-trivial amount of work.  My code is certainly 
worse if you have only a few very large text files for example and thus I'd 
generally encourage people to try the built in options first.

However, one of the nice things about Spark I think is the flexibility that it 
gives you. So, when you are trying to read 100,000s of tiny files this works 
pretty well.  I'll also comment that this does not create a task per file and 
that is another reason its faster for the many small files case.  Of course 
that comes at the expense of locality (which doesn't matter for my use case on 
S3 anyway)...

On Tue, Mar 17, 2015 at 8:16 AM, Imran Rashid iras...@cloudera.com 
mailto:iras...@cloudera.com wrote:
Interesting, on another thread, I was just arguing that the user should *not* 
open the files themselves and read them, b/c then they lose all the other 
goodies we have in HadoopRDD, eg. the metric tracking.

I think this encourages Pat's argument that we might actually need better 
support for this in spark context itself?

On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust mich...@databricks.com 
mailto:mich...@databricks.com wrote:

Here is how I have dealt with many small text files (on s3 though this should 
generalize) in the past:
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E
 
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E


 
FromMichael Armbrust mich...@databricks.com 
mailto:mich...@databricks.com
Subject Re: S3NativeFileSystem inefficient implementation when calling 
sc.textFile
DateThu, 27 Nov 2014 03:20:14 GMT
In the past I have worked around this problem by avoiding sc.textFile().
Instead I read the data directly inside of a Spark job.  Basically, you
start with an RDD where each entry is a file in S3 and then flatMap that
with something that reads the files and returns the lines.

Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe 
https://gist.github.com/marmbrus/fff0b058f134fa7752fe

Using this class you can do something like:

sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... ::
Nil).flatMap(new ReadLinesSafe(_))

You can also build up the list of files by running a Spark job:
https://gist.github.com/marmbrus/15e72f7bc22337cf6653 
https://gist.github.com/marmbrus/15e72f7bc22337cf6653

Michael

On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
It’s a long story but there are many dirs with smallish part- files in them 
so we create a list of the individual files as input to 
sparkContext.textFile(fileList). I suppose we could move them and rename them 
to be contiguous part- files in one dir. Would that be better than passing 
in a long list of individual filenames? We could also make the part files much 
larger by collecting the smaller ones. But would any of this make a difference 
in IO speed?

I ask because using the long file list seems to read, what amounts to a not 
very large data set rather slowly. If it were all in large part files in one 
dir I’d expect it to go much faster but this is just intuition.


On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com 
mailto:ko...@tresata.com wrote:

why can you not put them in a directory and read them as one input? you will 
get a task per file, but spark is very fast at executing many tasks (its not a 
jvm per task).

On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:

We have many text files that we need to read in parallel. We can create a comma 
delimited list of files to pass in to sparkContext.textFile(fileList). The list 
can get very large (maybe 1) and is all on hdfs.

The question is: what is the most performant way to read them? Should they be 
broken up and read in groups

Re: Need Advice about reading lots of text files

2015-03-15 Thread Pat Ferrel
Ah most interesting—thanks.

So it seems sc.textFile(longFileList) has to read all metadata before starting 
the read for partitioning purposes so what you do is not use it? 

You create a task per file that reads one file (in parallel) per task without 
scanning for _all_ metadata. Can’t argue with the logic but perhaps Spark 
should incorporate something like this in sc.textFile? My case can’t be that 
unusual especially since I am periodically processing micro-batches from Spark 
Streaming. In fact Actually I have to scan HDFS to create the longFileList to 
begin with so get file status and therefore probably all the metadata needed by 
sc.textFile. Your method would save one scan, which is good.

Might a better sc.textFile take a beginning URI, a file pattern regex, and a 
recursive flag? Then one scan could create all metadata automatically for a 
large subset of people using the function, something like 

sc.textFile(beginDir: String, filePattern: String = “^part.*”, recursive: 
Boolean = false)

I fact it should be easy to create BetterSC that overrides the textFile method 
with a re-implementation that only requires one scan to get metadata. 

Just thinking on email…

On Mar 14, 2015, at 11:11 AM, Michael Armbrust mich...@databricks.com wrote:


Here is how I have dealt with many small text files (on s3 though this should 
generalize) in the past:
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E
 
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E


 
FromMichael Armbrust mich...@databricks.com 
mailto:mich...@databricks.com
Subject Re: S3NativeFileSystem inefficient implementation when calling 
sc.textFile
DateThu, 27 Nov 2014 03:20:14 GMT
In the past I have worked around this problem by avoiding sc.textFile().
Instead I read the data directly inside of a Spark job.  Basically, you
start with an RDD where each entry is a file in S3 and then flatMap that
with something that reads the files and returns the lines.

Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe 
https://gist.github.com/marmbrus/fff0b058f134fa7752fe

Using this class you can do something like:

sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... ::
Nil).flatMap(new ReadLinesSafe(_))

You can also build up the list of files by running a Spark job:
https://gist.github.com/marmbrus/15e72f7bc22337cf6653 
https://gist.github.com/marmbrus/15e72f7bc22337cf6653

Michael

On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
It’s a long story but there are many dirs with smallish part- files in them 
so we create a list of the individual files as input to 
sparkContext.textFile(fileList). I suppose we could move them and rename them 
to be contiguous part- files in one dir. Would that be better than passing 
in a long list of individual filenames? We could also make the part files much 
larger by collecting the smaller ones. But would any of this make a difference 
in IO speed?

I ask because using the long file list seems to read, what amounts to a not 
very large data set rather slowly. If it were all in large part files in one 
dir I’d expect it to go much faster but this is just intuition.


On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com 
mailto:ko...@tresata.com wrote:

why can you not put them in a directory and read them as one input? you will 
get a task per file, but spark is very fast at executing many tasks (its not a 
jvm per task).

On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:

We have many text files that we need to read in parallel. We can create a comma 
delimited list of files to pass in to sparkContext.textFile(fileList). The list 
can get very large (maybe 1) and is all on hdfs.

The question is: what is the most performant way to read them? Should they be 
broken up and read in groups appending the resulting RDDs or should we just 
pass in the entire list at once? In effect I’m asking if Spark does some 
optimization of whether we should do it explicitly. If the later, what rule 
might we use depending on our cluster setup?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e

Re: Need Advice about reading lots of text files

2015-03-14 Thread Pat Ferrel
It’s a long story but there are many dirs with smallish part- files in them 
so we create a list of the individual files as input to 
sparkContext.textFile(fileList). I suppose we could move them and rename them 
to be contiguous part- files in one dir. Would that be better than passing 
in a long list of individual filenames? We could also make the part files much 
larger by collecting the smaller ones. But would any of this make a difference 
in IO speed?

I ask because using the long file list seems to read, what amounts to a not 
very large data set rather slowly. If it were all in large part files in one 
dir I’d expect it to go much faster but this is just intuition.


On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote:

why can you not put them in a directory and read them as one input? you will 
get a task per file, but spark is very fast at executing many tasks (its not a 
jvm per task).

On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:

We have many text files that we need to read in parallel. We can create a comma 
delimited list of files to pass in to sparkContext.textFile(fileList). The list 
can get very large (maybe 1) and is all on hdfs.

The question is: what is the most performant way to read them? Should they be 
broken up and read in groups appending the resulting RDDs or should we just 
pass in the entire list at once? In effect I’m asking if Spark does some 
optimization of whether we should do it explicitly. If the later, what rule 
might we use depending on our cluster setup?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org





Re: Need Advice about reading lots of text files

2015-03-14 Thread Pat Ferrel
Any advice on dealing with a large number of separate input files?


On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote:

We have many text files that we need to read in parallel. We can create a comma 
delimited list of files to pass in to sparkContext.textFile(fileList). The list 
can get very large (maybe 1) and is all on hdfs. 

The question is: what is the most performant way to read them? Should they be 
broken up and read in groups appending the resulting RDDs or should we just 
pass in the entire list at once? In effect I’m asking if Spark does some 
optimization of whether we should do it explicitly. If the later, what rule 
might we use depending on our cluster setup?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Need Advice about reading lots of text files

2015-03-13 Thread Pat Ferrel
We have many text files that we need to read in parallel. We can create a comma 
delimited list of files to pass in to sparkContext.textFile(fileList). The list 
can get very large (maybe 1) and is all on hdfs. 

The question is: what is the most performant way to read them? Should they be 
broken up and read in groups appending the resulting RDDs or should we just 
pass in the entire list at once? In effect I’m asking if Spark does some 
optimization of whether we should do it explicitly. If the later, what rule 
might we use depending on our cluster setup?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-02 Thread Pat Ferrel
Sab, not sure what you require for the similarity metric or your use case but 
you can also look at spark-rowsimilarity or spark-itemsimilarity (column-wise) 
here http://mahout.apache.org/users/recommender/intro-cooccurrence-spark.html 
http://mahout.apache.org/users/recommender/intro-cooccurrence-spark.html.  
These are optimized for LLR based “similarity” which is very simple to 
calculate since you don’t use either the item weight or the entire row or 
column vector values. Downsampling is done by number of values per column (or 
row) and by LLR strength. This keeps it to O(n)

They run pretty fast and only use memory if you use the version that attaches 
application IDs to the rows and columns. Using SimilarityAnalysis.cooccurrence 
may help. It’s in the Spark/Scala part of Mahout.

On Mar 2, 2015, at 12:56 PM, Reza Zadeh r...@databricks.com wrote:

Hi Sab,
The current method is optimized for having many rows and few columns. In your 
case it is exactly the opposite. We are working on your case, tracked by this 
JIRA: https://issues.apache.org/jira/browse/SPARK-4823 
https://issues.apache.org/jira/browse/SPARK-4823
Your case is very common, so I will put some time into building it.

In the meantime, if you're looking for groups of similar points, consider using 
K-means - it will get you clusters of similar rows with euclidean distance.

Best,
Reza


On Sun, Mar 1, 2015 at 9:36 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com mailto:sabarish.sasidha...@manthan.com 
wrote:

​Hi Reza
​​
I see that ((int, int), double) pairs are generated for any combination that 
meets the criteria controlled by the threshold. But assuming a simple 1x10K 
matrix that means I would need atleast 12GB memory per executor for the flat 
map just for these pairs excluding any other overhead. Is that correct? How can 
we make this scale for even larger n (when m stays small) like 100 x 5 
million.​ One is by using higher thresholds. The other is that I use a 
SparseVector to begin with. Are there any other optimizations I can take 
advantage of?




​Thanks
Sab





Re: Upgrade to Spark 1.2.1 using Guava

2015-03-02 Thread Pat Ferrel
Marcelo’s work-around works. So if you are using the itemsimilarity stuff, the 
CLI has a way to solve the class not found and I can point out how to do the 
equivalent if you are using the library API. Ping me if you care.


On Feb 28, 2015, at 2:27 PM, Erlend Hamnaberg erl...@hamnaberg.net wrote:

Yes. I ran into this problem with mahout snapshot and spark 1.2.0 not really 
trying to figure out why that was a problem, since there were already too many 
moving parts in my app. Obviously there is a classpath issue somewhere.

/Erlend

On 27 Feb 2015 22:30, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
@Erlend hah, we were trying to merge your PR and ran into this—small world. You 
actually compile the JavaSerializer source in your project?

@Marcelo do you mean by modifying spark.executor.extraClassPath on all workers, 
that didn’t seem to work?

On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg erl...@hamnaberg.net 
mailto:erl...@hamnaberg.net wrote:

Hi.

I have had a simliar issue. I had to pull the JavaSerializer source into my own 
project, just so I got the classloading of this class under control.

This must be a class loader issue with spark.

-E

On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
I understand that I need to supply Guava to Spark. The HashBiMap is created in 
the client and broadcast to the workers. So it is needed in both. To achieve 
this there is a deps.jar with Guava (and Scopt but that is only for the 
client). Scopt is found so I know the jar is fine for the client.

I pass in the deps.jar to the context creation code. I’ve checked the content 
of the jar and have verified that it is used at context creation time.

I register the serializer as follows:

class MyKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) = {

val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
//kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
log.info http://log.info/(\n\n\nRegister Serializer for  + 
h.getClass.getCanonicalName + \n\n\n) // just to be sure this does indeed get 
logged
kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
new JavaSerializer())
  }
}

The job proceeds up until the broadcast value, a HashBiMap, is deserialized, 
which is where I get the following error.

Have I missed a step for deserialization of broadcast values? Odd that 
serialization happened but deserialization failed. I’m running on a standalone 
localhost-only cluster.


15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 
(TID 9, 192.168.0.2): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216

Re: Upgrade to Spark 1.2.1 using Guava

2015-02-28 Thread Pat Ferrel
Maybe but any time the work around is to use spark-submit --conf 
spark.executor.extraClassPath=/guava.jar blah” that means that standalone apps 
must have hard coded paths that are honored on every worker. And as you know a 
lib is pretty much blocked from use of this version of Spark—hence the blocker 
severity.

I could easily be wrong but userClassPathFirst doesn’t seem to be the issue. 
There is no class conflict.

On Feb 27, 2015, at 7:13 PM, Sean Owen so...@cloudera.com wrote:

This seems like a job for userClassPathFirst. Or could be. It's
definitely an issue of visibility between where the serializer is and
where the user class is.

At the top you said Pat that you didn't try this, but why not?

On Fri, Feb 27, 2015 at 10:11 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I’ll try to find a Jira for it. I hope a fix is in 1.3
 
 
 On Feb 27, 2015, at 1:59 PM, Pat Ferrel p...@occamsmachete.com wrote:
 
 Thanks! that worked.
 
 On Feb 27, 2015, at 1:50 PM, Pat Ferrel p...@occamsmachete.com wrote:
 
 I don’t use spark-submit I have a standalone app.
 
 So I guess you want me to add that key/value to the conf in my code and make 
 sure it exists on workers.
 
 
 On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin van...@cloudera.com wrote:
 
 On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I changed in the spark master conf, which is also the only worker. I added a 
 path to the jar that has guava in it. Still can’t find the class.
 
 Sorry, I'm still confused about what config you're changing. I'm
 suggesting using:
 
 spark-submit --conf spark.executor.extraClassPath=/guava.jar blah
 
 
 --
 Marcelo
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I understand that I need to supply Guava to Spark. The HashBiMap is created in 
the client and broadcast to the workers. So it is needed in both. To achieve 
this there is a deps.jar with Guava (and Scopt but that is only for the 
client). Scopt is found so I know the jar is fine for the client. 

I pass in the deps.jar to the context creation code. I’ve checked the content 
of the jar and have verified that it is used at context creation time. 

I register the serializer as follows:

class MyKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) = {

val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
//kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
log.info(\n\n\nRegister Serializer for  + h.getClass.getCanonicalName + 
\n\n\n) // just to be sure this does indeed get logged
kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
new JavaSerializer())
  }
}

The job proceeds up until the broadcast value, a HashBiMap, is deserialized, 
which is where I get the following error. 

Have I missed a step for deserialization of broadcast values? Odd that 
serialization happened but deserialization failed. I’m running on a standalone 
localhost-only cluster.


15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 
(TID 9, 192.168.0.2): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
... 19 more

 root eror ==
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
...








On Feb 25, 2015, at 5:24 PM, Marcelo Vanzin van...@cloudera.com wrote:

Guava is not in Spark. (Well, long version: it's in Spark but it's
relocated to a different package except for some special classes
leaked through the public API.)

If your app needs Guava, it needs to package Guava with it (e.g. by
using maven-shade-plugin, or using --jars if only executors use
Guava).

On Wed, Feb 25, 2015 at 5:17 PM, Pat Ferrel p...@occamsmachete.com wrote:
 The root Spark pom has guava set at a certain version number. It’s very hard
 to read the shading xml. Someone suggested that I try using
 userClassPathFirst but that sounds too heavy handed since I don’t really
 care which version of guava I get, not picky.
 
 When I set my project to use the same version as Spark I get a missing
 classdef, which usually means a version conflict.
 
 At this point I am quite confused about what

Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I don’t use spark-submit I have a standalone app.

So I guess you want me to add that key/value to the conf in my code and make 
sure it exists on workers.


On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin van...@cloudera.com wrote:

On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I changed in the spark master conf, which is also the only worker. I added a 
 path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
@Erlend hah, we were trying to merge your PR and ran into this—small world. You 
actually compile the JavaSerializer source in your project?

@Marcelo do you mean by modifying spark.executor.extraClassPath on all workers, 
that didn’t seem to work?

On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg erl...@hamnaberg.net wrote:

Hi.

I have had a simliar issue. I had to pull the JavaSerializer source into my own 
project, just so I got the classloading of this class under control.

This must be a class loader issue with spark.

-E

On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
I understand that I need to supply Guava to Spark. The HashBiMap is created in 
the client and broadcast to the workers. So it is needed in both. To achieve 
this there is a deps.jar with Guava (and Scopt but that is only for the 
client). Scopt is found so I know the jar is fine for the client.

I pass in the deps.jar to the context creation code. I’ve checked the content 
of the jar and have verified that it is used at context creation time.

I register the serializer as follows:

class MyKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) = {

val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
//kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
log.info http://log.info/(\n\n\nRegister Serializer for  + 
h.getClass.getCanonicalName + \n\n\n) // just to be sure this does indeed get 
logged
kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
new JavaSerializer())
  }
}

The job proceeds up until the broadcast value, a HashBiMap, is deserialized, 
which is where I get the following error.

Have I missed a step for deserialization of broadcast values? Odd that 
serialization happened but deserialization failed. I’m running on a standalone 
localhost-only cluster.


15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 
(TID 9, 192.168.0.2): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
... 19 more

 root eror ==
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
...








On Feb 25, 2015, at 5:24 PM, Marcelo Vanzin van...@cloudera.com 
mailto:van...@cloudera.com wrote:

Guava is not in Spark. (Well, long version: it's in Spark but it's
relocated to a different package except for some special classes
leaked

Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
Thanks! that worked.

On Feb 27, 2015, at 1:50 PM, Pat Ferrel p...@occamsmachete.com wrote:

I don’t use spark-submit I have a standalone app.

So I guess you want me to add that key/value to the conf in my code and make 
sure it exists on workers.


On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin van...@cloudera.com wrote:

On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I changed in the spark master conf, which is also the only worker. I added a 
 path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I’ll try to find a Jira for it. I hope a fix is in 1.3


On Feb 27, 2015, at 1:59 PM, Pat Ferrel p...@occamsmachete.com wrote:

Thanks! that worked.

On Feb 27, 2015, at 1:50 PM, Pat Ferrel p...@occamsmachete.com wrote:

I don’t use spark-submit I have a standalone app.

So I guess you want me to add that key/value to the conf in my code and make 
sure it exists on workers.


On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin van...@cloudera.com wrote:

On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I changed in the spark master conf, which is also the only worker. I added a 
 path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I changed in the spark master conf, which is also the only worker. I added a 
path to the jar that has guava in it. Still can’t find the class.

Trying Erland’s idea next.

On Feb 27, 2015, at 1:35 PM, Marcelo Vanzin van...@cloudera.com wrote:

On Fri, Feb 27, 2015 at 1:30 PM, Pat Ferrel p...@occamsmachete.com wrote:
 @Marcelo do you mean by modifying spark.executor.extraClassPath on all
 workers, that didn’t seem to work?

That's an app configuration, not a worker configuration, so if you're
trying to set it on the worker configuration it will definitely not
work.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



upgrade to Spark 1.2.1

2015-02-25 Thread Pat Ferrel
Getting an error that confuses me. Running a largish app on a standalone 
cluster on my laptop. The app uses a guava HashBiMap as a broadcast value. With 
Spark 1.1.0 I simply registered the class and its serializer with kryo like 
this:

   kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new 
JavaSerializer())

And all was well. I’ve also tried addSerializer instead of register. Now I get 
a class not found during deserialization. I checked the jar list used to create 
the context and found the jar that contains HashBiMap but get this error. Any 
ideas:

15/02/25 14:46:33 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 
(TID 8, 192.168.0.2): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.mahout.drivers.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
at 
org.apache.mahout.drivers.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
... 19 more

== root error 
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap


at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)
... 24 more



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Row similarities

2015-01-18 Thread Pat Ferrel
Right, done with matrix blocks. Seems like a lot of duplicate effort. but 
that’s the way of OSS sometimes. 

I didn’t see transpose in the Jira. Are there plans for transpose and 
rowSimilarity without transpose? The latter seems easier than columnSimilarity 
in the general/naive case. Thresholds could also be used there.

A threshold for downsampling is going to be extremely hard to use in practice, 
but not sure what the threshold applies to so I must skimmed too fast.  If the 
threshold were some number of sigmas it would be more usable and could extend 
to non-cosine similarities. Would add a non-trivial step to the calc, of 
course. But without it I’ve never seen a threshold actually used effectively 
and I’ve tried. It’s so dependent on the dataset.

Observation as a question: One of the reasons I use the Mahout Spark R-like DSL 
is for row and column similarity (uses in cooccurrence recommenders) and you 
can count on a robust full linear algebra implementation. Once you have full 
linear algebra on top of an optimizer many of the factorization methods are 
dead simple. Seems like MLlib has chosen to do optimized higher level 
algorithms first instead of full linear algebra first?

On Jan 17, 2015, at 6:27 PM, Reza Zadeh r...@databricks.com wrote:

We're focused on providing block matrices, which makes transposition simple: 
https://issues.apache.org/jira/browse/SPARK-3434 
https://issues.apache.org/jira/browse/SPARK-3434

On Sat, Jan 17, 2015 at 3:25 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
In the Mahout Spark R-like DSL [A’A] and [AA’] doesn’t actually do a 
transpose—it’s optimized out. Mahout has had a stand alone row matrix transpose 
since day 1 and supports it in the Spark version. Can’t really do matrix 
algebra without it even though it’s often possible to optimize it away. 

Row similarity with LLR is much simpler than cosine since you only need 
non-zero sums for column, row, and matrix elements so rowSimilarity is 
implemented in Mahout for Spark. Full blown row similarity including all the 
different similarity methods (long since implemented in hadoop mapreduce) 
hasn’t been moved to spark yet.

Yep, rows are not covered in the blog, my mistake. Too bad it has a lot of uses 
and can at very least be optimized for output matrix symmetry.

On Jan 17, 2015, at 11:44 AM, Andrew Musselman andrew.mussel...@gmail.com 
mailto:andrew.mussel...@gmail.com wrote:

Yeah okay, thanks.

On Jan 17, 2015, at 11:15 AM, Reza Zadeh r...@databricks.com 
mailto:r...@databricks.com wrote:

 Pat, columnSimilarities is what that blog post is about, and is already part 
 of Spark 1.2.
 
 rowSimilarities in a RowMatrix is a little more tricky because you can't 
 transpose a RowMatrix easily, and is being tracked by this JIRA: 
 https://issues.apache.org/jira/browse/SPARK-4823 
 https://issues.apache.org/jira/browse/SPARK-4823
 
 Andrew, sometimes (not always) it's OK to transpose a RowMatrix, if for 
 example the number of rows in your RowMatrix is less than 1m, you can 
 transpose it and use rowSimilarities.
 
 
 On Sat, Jan 17, 2015 at 10:45 AM, Pat Ferrel p...@occamsmachete.com 
 mailto:p...@occamsmachete.com wrote:
 BTW it looks like row and column similarities (cosine based) are coming to 
 MLlib through DIMSUM. Andrew said rowSimilarity doesn’t seem to be in the 
 master yet. Does anyone know the status?
 
 See: 
 https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
  
 https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
 
 Also the method for computation reduction (make it less than O(n^2)) seems 
 rooted in cosine. A different computation reduction method is used in the 
 Mahout code tied to LLR. Seems like we should get these together.
  
 On Jan 17, 2015, at 9:37 AM, Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com wrote:
 
 Excellent, thanks Pat.
 
 On Jan 17, 2015, at 9:27 AM, Pat Ferrel p...@occamsmachete.com 
 mailto:p...@occamsmachete.com wrote:
 
 Mahout’s Spark implementation of rowsimilarity is in the Scala 
 SimilarityAnalysis class. It actually does either row or column similarity 
 but only supports LLR at present. It does [AA’] for columns or [A’A] for 
 rows first then calculates the distance (LLR) for non-zero elements. This is 
 a major optimization for sparse matrices. As I recall the old hadoop code 
 only did this for half the matrix since it’s symmetric but that optimization 
 isn’t in the current code because the downsampling is done as LLR is 
 calculated, so the entire similarity matrix is never actually calculated 
 unless you disable downsampling. 
 
 The primary use is for recommenders but I’ve used it (in the test suite) for 
 row-wise text token similarity too.  
 
 On Jan 17, 2015, at 9:00 AM, Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com wrote:
 
 Yeah that's the kind of thing I'm looking

Re: Row similarities

2015-01-17 Thread Pat Ferrel
BTW it looks like row and column similarities (cosine based) are coming to 
MLlib through DIMSUM. Andrew said rowSimilarity doesn’t seem to be in the 
master yet. Does anyone know the status?

See: 
https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
 
https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html

Also the method for computation reduction (make it less than O(n^2)) seems 
rooted in cosine. A different computation reduction method is used in the 
Mahout code tied to LLR. Seems like we should get these together.
 
On Jan 17, 2015, at 9:37 AM, Andrew Musselman andrew.mussel...@gmail.com 
wrote:

Excellent, thanks Pat.

On Jan 17, 2015, at 9:27 AM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:

 Mahout’s Spark implementation of rowsimilarity is in the Scala 
 SimilarityAnalysis class. It actually does either row or column similarity 
 but only supports LLR at present. It does [AA’] for columns or [A’A] for rows 
 first then calculates the distance (LLR) for non-zero elements. This is a 
 major optimization for sparse matrices. As I recall the old hadoop code only 
 did this for half the matrix since it’s symmetric but that optimization isn’t 
 in the current code because the downsampling is done as LLR is calculated, so 
 the entire similarity matrix is never actually calculated unless you disable 
 downsampling. 
 
 The primary use is for recommenders but I’ve used it (in the test suite) for 
 row-wise text token similarity too.  
 
 On Jan 17, 2015, at 9:00 AM, Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com wrote:
 
 Yeah that's the kind of thing I'm looking for; was looking at SPARK-4259 and 
 poking around to see how to do things.
 
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4259 
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4259
 
 On Jan 17, 2015, at 8:35 AM, Suneel Marthi suneel_mar...@yahoo.com 
 mailto:suneel_mar...@yahoo.com wrote:
 
 Andrew, u would be better off using Mahout's RowSimilarityJob for what u r 
 trying to accomplish.
 
  1.  It does give u pair-wise distances
  2.  U can specify the Distance measure u r looking to use
  3.  There's the old MapReduce impl and the Spark DSL impl per ur preference.
 
 From: Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com
 To: Reza Zadeh r...@databricks.com mailto:r...@databricks.com 
 Cc: user user@spark.apache.org mailto:user@spark.apache.org 
 Sent: Saturday, January 17, 2015 11:29 AM
 Subject: Re: Row similarities
 
 Thanks Reza, interesting approach.  I think what I actually want is to 
 calculate pair-wise distance, on second thought.  Is there a pattern for 
 that?
 
 
 
 On Jan 16, 2015, at 9:53 PM, Reza Zadeh r...@databricks.com 
 mailto:r...@databricks.com wrote:
 
 You can use K-means 
 https://spark.apache.org/docs/latest/mllib-clustering.html with a 
 suitably large k. Each cluster should correspond to rows that are similar 
 to one another.
 
 On Fri, Jan 16, 2015 at 5:18 PM, Andrew Musselman 
 andrew.mussel...@gmail.com mailto:andrew.mussel...@gmail.com wrote:
 What's a good way to calculate similarities between all vector-rows in a 
 matrix or RDD[Vector]?
 
 I'm seeing RowMatrix has a columnSimilarities method but I'm not sure I'm 
 going down a good path to transpose a matrix in order to run that.
 
 
 
 



Re: Row similarities

2015-01-17 Thread Pat Ferrel
In the Mahout Spark R-like DSL [A’A] and [AA’] doesn’t actually do a 
transpose—it’s optimized out. Mahout has had a stand alone row matrix transpose 
since day 1 and supports it in the Spark version. Can’t really do matrix 
algebra without it even though it’s often possible to optimize it away. 

Row similarity with LLR is much simpler than cosine since you only need 
non-zero sums for column, row, and matrix elements so rowSimilarity is 
implemented in Mahout for Spark. Full blown row similarity including all the 
different similarity methods (long since implemented in hadoop mapreduce) 
hasn’t been moved to spark yet.

Yep, rows are not covered in the blog, my mistake. Too bad it has a lot of uses 
and can at very least be optimized for output matrix symmetry.

On Jan 17, 2015, at 11:44 AM, Andrew Musselman andrew.mussel...@gmail.com 
wrote:

Yeah okay, thanks.

On Jan 17, 2015, at 11:15 AM, Reza Zadeh r...@databricks.com 
mailto:r...@databricks.com wrote:

 Pat, columnSimilarities is what that blog post is about, and is already part 
 of Spark 1.2.
 
 rowSimilarities in a RowMatrix is a little more tricky because you can't 
 transpose a RowMatrix easily, and is being tracked by this JIRA: 
 https://issues.apache.org/jira/browse/SPARK-4823 
 https://issues.apache.org/jira/browse/SPARK-4823
 
 Andrew, sometimes (not always) it's OK to transpose a RowMatrix, if for 
 example the number of rows in your RowMatrix is less than 1m, you can 
 transpose it and use rowSimilarities.
 
 
 On Sat, Jan 17, 2015 at 10:45 AM, Pat Ferrel p...@occamsmachete.com 
 mailto:p...@occamsmachete.com wrote:
 BTW it looks like row and column similarities (cosine based) are coming to 
 MLlib through DIMSUM. Andrew said rowSimilarity doesn’t seem to be in the 
 master yet. Does anyone know the status?
 
 See: 
 https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
  
 https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
 
 Also the method for computation reduction (make it less than O(n^2)) seems 
 rooted in cosine. A different computation reduction method is used in the 
 Mahout code tied to LLR. Seems like we should get these together.
  
 On Jan 17, 2015, at 9:37 AM, Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com wrote:
 
 Excellent, thanks Pat.
 
 On Jan 17, 2015, at 9:27 AM, Pat Ferrel p...@occamsmachete.com 
 mailto:p...@occamsmachete.com wrote:
 
 Mahout’s Spark implementation of rowsimilarity is in the Scala 
 SimilarityAnalysis class. It actually does either row or column similarity 
 but only supports LLR at present. It does [AA’] for columns or [A’A] for 
 rows first then calculates the distance (LLR) for non-zero elements. This is 
 a major optimization for sparse matrices. As I recall the old hadoop code 
 only did this for half the matrix since it’s symmetric but that optimization 
 isn’t in the current code because the downsampling is done as LLR is 
 calculated, so the entire similarity matrix is never actually calculated 
 unless you disable downsampling. 
 
 The primary use is for recommenders but I’ve used it (in the test suite) for 
 row-wise text token similarity too.  
 
 On Jan 17, 2015, at 9:00 AM, Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com wrote:
 
 Yeah that's the kind of thing I'm looking for; was looking at SPARK-4259 and 
 poking around to see how to do things.
 
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4259 
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4259
 
 On Jan 17, 2015, at 8:35 AM, Suneel Marthi suneel_mar...@yahoo.com 
 mailto:suneel_mar...@yahoo.com wrote:
 
 Andrew, u would be better off using Mahout's RowSimilarityJob for what u r 
 trying to accomplish.
 
  1.  It does give u pair-wise distances
  2.  U can specify the Distance measure u r looking to use
  3.  There's the old MapReduce impl and the Spark DSL impl per ur 
 preference.
 
 From: Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com
 To: Reza Zadeh r...@databricks.com mailto:r...@databricks.com 
 Cc: user user@spark.apache.org mailto:user@spark.apache.org 
 Sent: Saturday, January 17, 2015 11:29 AM
 Subject: Re: Row similarities
 
 Thanks Reza, interesting approach.  I think what I actually want is to 
 calculate pair-wise distance, on second thought.  Is there a pattern for 
 that?
 
 
 
 On Jan 16, 2015, at 9:53 PM, Reza Zadeh r...@databricks.com 
 mailto:r...@databricks.com wrote:
 
 You can use K-means 
 https://spark.apache.org/docs/latest/mllib-clustering.html with a 
 suitably large k. Each cluster should correspond to rows that are similar 
 to one another.
 
 On Fri, Jan 16, 2015 at 5:18 PM, Andrew Musselman 
 andrew.mussel...@gmail.com mailto:andrew.mussel...@gmail.com wrote:
 What's a good way to calculate similarities between all vector-rows in a 
 matrix or RDD[Vector]?
 
 I'm seeing

Re: Row similarities

2015-01-17 Thread Pat Ferrel
Mahout’s Spark implementation of rowsimilarity is in the Scala 
SimilarityAnalysis class. It actually does either row or column similarity but 
only supports LLR at present. It does [AA’] for columns or [A’A] for rows first 
then calculates the distance (LLR) for non-zero elements. This is a major 
optimization for sparse matrices. As I recall the old hadoop code only did this 
for half the matrix since it’s symmetric but that optimization isn’t in the 
current code because the downsampling is done as LLR is calculated, so the 
entire similarity matrix is never actually calculated unless you disable 
downsampling. 

The primary use is for recommenders but I’ve used it (in the test suite) for 
row-wise text token similarity too.  

On Jan 17, 2015, at 9:00 AM, Andrew Musselman andrew.mussel...@gmail.com 
wrote:

Yeah that's the kind of thing I'm looking for; was looking at SPARK-4259 and 
poking around to see how to do things.

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4259 
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4259

On Jan 17, 2015, at 8:35 AM, Suneel Marthi suneel_mar...@yahoo.com 
mailto:suneel_mar...@yahoo.com wrote:

 Andrew, u would be better off using Mahout's RowSimilarityJob for what u r 
 trying to accomplish.
 
  1.  It does give u pair-wise distances
  2.  U can specify the Distance measure u r looking to use
  3.  There's the old MapReduce impl and the Spark DSL impl per ur preference.
 
 From: Andrew Musselman andrew.mussel...@gmail.com 
 mailto:andrew.mussel...@gmail.com
 To: Reza Zadeh r...@databricks.com mailto:r...@databricks.com 
 Cc: user user@spark.apache.org mailto:user@spark.apache.org 
 Sent: Saturday, January 17, 2015 11:29 AM
 Subject: Re: Row similarities
 
 Thanks Reza, interesting approach.  I think what I actually want is to 
 calculate pair-wise distance, on second thought.  Is there a pattern for that?
 
 
 
 On Jan 16, 2015, at 9:53 PM, Reza Zadeh r...@databricks.com 
 mailto:r...@databricks.com wrote:
 
 You can use K-means 
 https://spark.apache.org/docs/latest/mllib-clustering.html with a suitably 
 large k. Each cluster should correspond to rows that are similar to one 
 another.
 
 On Fri, Jan 16, 2015 at 5:18 PM, Andrew Musselman 
 andrew.mussel...@gmail.com mailto:andrew.mussel...@gmail.com wrote:
 What's a good way to calculate similarities between all vector-rows in a 
 matrix or RDD[Vector]?
 
 I'm seeing RowMatrix has a columnSimilarities method but I'm not sure I'm 
 going down a good path to transpose a matrix in order to run that.
 
 
 



Re: Is there any Spark implementation for Item-based Collaborative Filtering?

2014-11-30 Thread Pat Ferrel
Actually the spark-itemsimilarity job and related code in the Spark module of 
Mahout creates all-pairs similarity too. It’s designed to use with a search 
engine, which provides the query part of the recommender. Integrate the two and 
you have a near realtime scalable item-based/cooccurrence collaborative 
filtering type recommender.


On Nov 30, 2014, at 12:09 PM, Sean Owen so...@cloudera.com wrote:

There is an implementation of all-pairs similarity. Have a look at the
DIMSUM implementation in RowMatrix. It is an element of what you would
need for such a recommender, but not the whole thing.

You can also do the model-building part of an ALS-based recommender
with ALS in MLlib.

So, no not directly, but there are related pieces.

On Sun, Nov 30, 2014 at 5:36 PM, shahab shahab.mok...@gmail.com wrote:
 Hi,
 
 I just wonder if there is any implementation for Item-based Collaborative
 Filtering in Spark?
 
 best,
 /Shahab

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Lots of small input files

2014-11-21 Thread Pat Ferrel
I have a job that searches for input recursively and creates a string of 
pathnames to treat as one input. 

The files are part-x files and they are fairly small. The job seems to take 
a long time to complete considering the size of the total data (150m) and only 
runs on the master machine. The job only does rdd.map type operations.

1) Why doesn’t it use the other workers in the cluster?
2) Is there a downside to using a lot of small part files? Should I coalesce 
them into one input file?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cores on Master

2014-11-18 Thread Pat Ferrel
I see the default and max cores settings but these seem to control total cores 
per cluster.

My cobbled together home cluster needs the Master to not use all its cores or 
it may lock up (it does other things). Is there a way to control max cores used 
for a particular cluster machine in standalone mode?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cores on Master

2014-11-18 Thread Pat Ferrel
Looks like I can do this by not using start-all.sh but starting each worker 
separately passing in a '--cores n' to the master? No config/env way?

On Nov 18, 2014, at 3:14 PM, Pat Ferrel p...@occamsmachete.com wrote:

I see the default and max cores settings but these seem to control total cores 
per cluster.

My cobbled together home cluster needs the Master to not use all its cores or 
it may lock up (it does other things). Is there a way to control max cores used 
for a particular cluster machine in standalone mode?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cores on Master

2014-11-18 Thread Pat Ferrel
This seems to work only on a ‘worker’ not the master? So I’m back to having no 
way to control cores on the master?
 
On Nov 18, 2014, at 3:24 PM, Pat Ferrel p...@occamsmachete.com wrote:

Looks like I can do this by not using start-all.sh but starting each worker 
separately passing in a '--cores n' to the master? No config/env way?

On Nov 18, 2014, at 3:14 PM, Pat Ferrel p...@occamsmachete.com wrote:

I see the default and max cores settings but these seem to control total cores 
per cluster.

My cobbled together home cluster needs the Master to not use all its cores or 
it may lock up (it does other things). Is there a way to control max cores used 
for a particular cluster machine in standalone mode?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cores on Master

2014-11-18 Thread Pat Ferrel
OK hacking the start-slave.sh did it 

On Nov 18, 2014, at 4:12 PM, Pat Ferrel p...@occamsmachete.com wrote:

This seems to work only on a ‘worker’ not the master? So I’m back to having no 
way to control cores on the master?

On Nov 18, 2014, at 3:24 PM, Pat Ferrel p...@occamsmachete.com wrote:

Looks like I can do this by not using start-all.sh but starting each worker 
separately passing in a '--cores n' to the master? No config/env way?

On Nov 18, 2014, at 3:14 PM, Pat Ferrel p...@occamsmachete.com wrote:

I see the default and max cores settings but these seem to control total cores 
per cluster.

My cobbled together home cluster needs the Master to not use all its cores or 
it may lock up (it does other things). Is there a way to control max cores used 
for a particular cluster machine in standalone mode?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Class not found

2014-10-21 Thread Pat Ferrel
maven cache is laid out differently but it does work on Linux and BSD/mac.

Still looks like a hack to me.

On Oct 21, 2014, at 1:28 PM, Pat Ferrel p...@occamsmachete.com wrote:

Doesn’t this seem like a dangerous error prone hack? It will build different 
bits on different machines. It doesn’t even work on my linux box because the 
mvn install doesn’t cache the same as on the mac.

If Spark is going to be supported on the maven repos shouldn’t it be addressed 
by different artifacts to support any option that changes the linkage 
info/class naming?

On Oct 21, 2014, at 12:16 PM, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:

Not sure if this has been clearly explained here but since I took a day to 
track it down…

Several people have experienced a class not found error on Spark when the class 
referenced is supposed to be in the Spark jars.

One thing that can cause this is if you are building Spark for your cluster 
environment. The instructions say to do a “mvn package …” Instead some of these 
errors can be fixed using the following procedure:

1) delete ~/.m2/repository/org/spark and your-project
2) build Spark for your version of Hadoop *but do not use mvn package ...”* 
use “mvn install …” This will put a copy of the exact bits you need into the 
maven cache for building your-project against. In my case using hadoop 1.2.1 it 
was mvn -Dhadoop.version=1.2.1 -DskipTests clean install” If you run tests on 
Spark some failures can safely be ignored so check before giving up. 
3) build your-project with “mvn clean install





Re: Upgrade to Spark 1.1.0?

2014-10-19 Thread Pat Ferrel
Trying to upgrade from Spark 1.0.1 to 1.1.0. Can’t imagine the upgrade is the 
problem but anyway...

I get a NoClassDefFoundError for RandomGenerator when running a driver from the 
CLI. But only when using a named master, even a standalone master. If I run 
using master = local[4] the job executes correctly but if I set the master to 
spark://Maclaurin.local:7077 though they are the same machine I get the 
NoClassDefFoundError. The classpath seems correct on the CLI and the jars do 
indeed contain the offending class (see below). There must be some difference 
in how classes are loaded between local[4] and spark://Maclaurin.local:7077?

Any ideas?

===

The driver is in mahout-spark_2.10-1.0-SNAPSHOT-job.jar so it’s execution means 
it must be in the classpath. When I look at what’s in the jar I see 
RandomGenerator.

Maclaurin:target pat$ jar tf mahout-spark_2.10-1.0-SNAPSHOT-job.jar | grep 
RandomGenerator
cern/jet/random/engine/RandomGenerator.class
org/apache/commons/math3/random/GaussianRandomGenerator.class
org/apache/commons/math3/random/JDKRandomGenerator.class
org/apache/commons/math3/random/UniformRandomGenerator.class
org/apache/commons/math3/random/RandomGenerator.class  ==!
org/apache/commons/math3/random/NormalizedRandomGenerator.class
org/apache/commons/math3/random/AbstractRandomGenerator.class
org/apache/commons/math3/random/StableRandomGenerator.class

But get the following error executing the job:

14/10/19 15:39:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 6.9 
(TID 84, 192.168.0.2): java.lang.NoClassDefFoundError: 
org/apache/commons/math3/random/RandomGenerator
org.apache.mahout.common.RandomUtils.getRandom(RandomUtils.java:65)

org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:272)

org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:267)

org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:33)

org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:32)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
java.lang.Thread.run(Thread.java:695)



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



local class incompatible: stream classdesc serialVersionUID

2014-10-16 Thread Pat Ferrel
I’ve read several discussions of the error here and so have wiped all cluster 
machines and copied the master’s spark build to the rest of the cluster. I’ve 
built my job on the master using the correct Spark version as a dependency and 
even build that version of Spark. I still get the incompatible serialVersionUID 
error.

If I run the job locally with master = local[8] it completes fine.

I thought I had incompatible builds but in the end I’m not quite sure what this 
error is telling me

14/10/16 15:21:03 WARN scheduler.TaskSetManager: Loss was due to 
java.io.InvalidClassException
java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class 
incompatible: stream classdesc serialVersionUID = 385418487991259089, local 
class serialVersionUID = -6766554341038829528
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:560)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1748)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135)
at 
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: local class incompatible: stream classdesc serialVersionUID

2014-10-16 Thread Pat Ferrel
Yes, I removed my Spark dir and scp’ed the master’s build to all cluster 
machines suspecting that problem.

My app (Apache Mahout) had Spark 1.0.1 in the POM but changing it to 1.0.2 (the 
Spark version installed) gave another error. I guess I’ll have to install Spark 
1.0.1 or get Mahout to update their dependencies.


On Oct 16, 2014, at 4:03 PM, Paweł Szulc paul.sz...@gmail.com wrote:

This looks like typical issue with serialization of same class between 
different versions of an application.

I've ran into similar (yet not the same) issues before. Are you 100% sure that 
you have the same version of Apache Spark on each node of the cluster? And I am 
not only asking about current project version (1.0.0, 1.1.0 etc.) but also 
about package type (hadoop 1.x, hadoop 2.x).

On Fri, Oct 17, 2014 at 12:35 AM, Pat Ferrel p...@occamsmachete.com wrote:
I’ve read several discussions of the error here and so have wiped all cluster 
machines and copied the master’s spark build to the rest of the cluster. I’ve 
built my job on the master using the correct Spark version as a dependency and 
even build that version of Spark. I still get the incompatible serialVersionUID 
error.

If I run the job locally with master = local[8] it completes fine.

I thought I had incompatible builds but in the end I’m not quite sure what this 
error is telling me

14/10/16 15:21:03 WARN scheduler.TaskSetManager: Loss was due to 
java.io.InvalidClassException
java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class 
incompatible: stream classdesc serialVersionUID = 385418487991259089, local 
class serialVersionUID = -6766554341038829528
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:560)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1748)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135)
at 
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





Running new code on a Spark Cluster

2014-06-26 Thread Pat Ferrel
I’ve created a CLI driver for a Spark version of a Mahout job called item 
similarity with several tests that all work fine on local[4] Spark standalone. 
The code even reads and writes to clustered HDFS. But switching to clustered 
Spark has a problem that seems tied to a broadcast and/or serialization.

The code uses HashBiMap, which is a Guava Java thing. There are two of these 
created for every Mahout drm (a distributed matrix), for bi-directional row and 
column ID lookup. They are created once and then broadcast for access 
everywhere.

When I run this on clustered Spark I get the following error. At one point we 
were using HashMaps and they seemed to work on the cluster. So I suspect 
something about the HashBiMap is causing the problem. I’m also suspicious that 
it may have to do with serialization in the broadcast. Here is a snippet of 
code and the error. 

Any ideas?

  // create BiMaps for bi-directional lookup of ID by either Mahout ID or 
external ID
  // broadcast them for access in distributed processes, so they are not 
recalculated in every task.
  // rowIDDictionary is a HashBiMap[String, Int]
  val rowIDDictionary = asOrderedDictionary(rowIDs) // this creates the 
HashBiMap in a non-dsitributed manner
  val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)

  val columnIDDictionary = asOrderedDictionary(columnIDs)) // this creates 
the HashBiMap in a non-dsitributed manner
  val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)

  val indexedInteractions =
interactions.map { case (rowID, columnID) =   // this is 
the stage being submitted before the error
  val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
  val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

  rowIndex - columnIndex
}

The error seems to happen in executing interactions.map when accessing the 
_bcast vals. Any idea where to start looking for this?

14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting Stage 9 
(MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83), which has no 
missing parents
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from 
Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83)
14/06/26 11:23:36 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 
tasks
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:0 as TID 16 
on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:0 as 2418 
bytes in 0 ms
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:1 as TID 17 
on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:1 as 2440 
bytes in 0 ms
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Lost TID 16 (task 9.0:0)
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.NullPointerException
java.lang.NullPointerException
at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:102)
at 
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
at 
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
at 

Re: File list read into single RDD

2014-05-21 Thread Pat Ferrel
Thanks this really helps. 

As long as I stick to HDFS paths, and files I’m good. I do know that code a bit 
but have never used it to say take input from one cluster via 
“hdfs://server:port/path” and output to another via 
“hdfs://another-server:another-port/path”. This seems to be supported by Spark 
so I’ll have to go back and look at how to do this in the HDFS api.

Specifically I’ll need to examine the directory/file structure on one cluster 
then check some things on what is potentially another cluster before output. I 
have usually assumed only one HDFS instance so it may just be a matter of me 
being more careful and preserving full URIs. In the past I may have made 
assumptions that output is to the same dir tree as the input. Maybe it’s a 
matter of being more scrupulous about that assumption.

It’s a bit hard to test this case since I have never really had access to two 
clusters so I’ll have to develop some new habits at least.

On May 18, 2014, at 11:13 AM, Andrew Ash and...@andrewash.com wrote:

Spark's sc.textFile() method delegates to sc.hadoopFile(), which uses Hadoop's 
FileInputFormat.setInputPaths() call.  There is no alternate storage system, 
Spark just delegates to Hadoop for the .textFile() call.

Hadoop can also support multiple URI schemes, not just hdfs:/// paths, so you 
can use Spark on data in S3 using s3:/// just the same as you would with HDFS.  
See Apache's documentation on S3 for more details.

As far as interacting with a FileSystem (HDFS or other) to list files, delete 
files, navigate paths, etc. from your driver program, you should be able to 
just instantiate a FileSystem object and use the normal Hadoop APIs from there. 
 The Apache getting started docs on reading/writing from Hadoop DFS should work 
the same for non-HDFS examples too.

I do think we could use a little recipe in our documentation to make 
interacting with HDFS a bit more straightforward.

Pat, if you get something that covers your case that you don't mind sharing, we 
can format it for including in future Spark docs.

Cheers!
Andrew


On Sun, May 18, 2014 at 9:13 AM, Pat Ferrel pat.fer...@gmail.com wrote:
Doesn’t using an HDFS path pattern then restrict the URI to an HDFS URI. Since 
Spark supports several FS schemes I’m unclear about how much to assume about 
using the hadoop file systems APIs and conventions. Concretely if I pass a 
pattern in with a HTTPS file system, will the pattern work? 

How does Spark implement its storage system? This seems to be an abstraction 
level beyond what is available in HDFS. In order to preserve that flexibility 
what APIs should I be using? It would be easy to say, HDFS only and use HDFS 
APIs but that would seem to limit things. Especially where you would like to 
read from one cluster and write to another. This is not so easy to do inside 
the HDFS APIs, or is advanced beyond my knowledge.

If I can stick to passing URIs to sc.textFile() I’m ok but if I need to examine 
the structure of the file system, I’m unclear how I should do it without 
sacrificing Spark’s flexibility.
 
On Apr 29, 2014, at 12:55 AM, Christophe Préaud christophe.pre...@kelkoo.com 
wrote:

Hi,

You can also use any path pattern as defined here: 
http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus%28org.apache.hadoop.fs.Path%29

e.g.:
sc.textFile('{/path/to/file1,/path/to/file2}')
Christophe.

On 29/04/2014 05:07, Nicholas Chammas wrote:
 Not that I know of. We were discussing it on another thread and it came up. 
 
 I think if you look up the Hadoop FileInputFormat API (which Spark uses) 
 you'll see it mentioned there in the docs. 
 
 http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapred/FileInputFormat.html
 
 But that's not obvious.
 
 Nick
 
 2014년 4월 28일 월요일, Pat Ferrelpat.fer...@gmail.com 님이 작성한 메시지:
 Perfect. 
 
 BTW just so I know where to look next time, was that in some docs?
 
 On Apr 28, 2014, at 7:04 PM, Nicholas Chammas nicholas.cham...@gmail.com 
 wrote:
 
 Yep, as I just found out, you can also provide 
 sc.textFile() with a comma-delimited string of all the files you want to load.
 
 For example:
 
 sc.textFile('/path/to/file1,/path/to/file2')
 So once you have your list of files, concatenate their paths like that and 
 pass the single string to 
 textFile().
 
 Nick
 
 
 
 On Mon, Apr 28, 2014 at 7:23 PM, Pat Ferrel pat.fer...@gmail.com wrote:
 sc.textFile(URI) supports reading multiple files in parallel but only with a 
 wildcard. I need to walk a dir tree, match a regex to create a list of files, 
 then I’d like to read them into a single RDD in parallel. I understand these 
 could go into separate RDDs then a union RDD can be created. Is there a way 
 to create a single RDD from a URI list?
 
 


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention

Re: File list read into single RDD

2014-05-18 Thread Pat Ferrel
Doesn’t using an HDFS path pattern then restrict the URI to an HDFS URI. Since 
Spark supports several FS schemes I’m unclear about how much to assume about 
using the hadoop file systems APIs and conventions. Concretely if I pass a 
pattern in with a HTTPS file system, will the pattern work? 

How does Spark implement its storage system? This seems to be an abstraction 
level beyond what is available in HDFS. In order to preserve that flexibility 
what APIs should I be using? It would be easy to say, HDFS only and use HDFS 
APIs but that would seem to limit things. Especially where you would like to 
read from one cluster and write to another. This is not so easy to do inside 
the HDFS APIs, or is advanced beyond my knowledge.

If I can stick to passing URIs to sc.textFile() I’m ok but if I need to examine 
the structure of the file system, I’m unclear how I should do it without 
sacrificing Spark’s flexibility.
 
On Apr 29, 2014, at 12:55 AM, Christophe Préaud christophe.pre...@kelkoo.com 
wrote:

Hi,

You can also use any path pattern as defined here: 
http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus%28org.apache.hadoop.fs.Path%29

e.g.:
sc.textFile('{/path/to/file1,/path/to/file2}')
Christophe.

On 29/04/2014 05:07, Nicholas Chammas wrote:
 Not that I know of. We were discussing it on another thread and it came up. 
 
 I think if you look up the Hadoop FileInputFormat API (which Spark uses) 
 you'll see it mentioned there in the docs. 
 
 http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapred/FileInputFormat.html
 
 But that's not obvious.
 
 Nick
 
 2014년 4월 28일 월요일, Pat Ferrelpat.fer...@gmail.com 님이 작성한 메시지:
 Perfect. 
 
 BTW just so I know where to look next time, was that in some docs?
 
 On Apr 28, 2014, at 7:04 PM, Nicholas Chammas nicholas.cham...@gmail.com 
 wrote:
 
 Yep, as I just found out, you can also provide 
 sc.textFile() with a comma-delimited string of all the files you want to load.
 
 For example:
 
 sc.textFile('/path/to/file1,/path/to/file2')
 So once you have your list of files, concatenate their paths like that and 
 pass the single string to 
 textFile().
 
 Nick
 
 
 
 On Mon, Apr 28, 2014 at 7:23 PM, Pat Ferrel pat.fer...@gmail.com wrote:
 sc.textFile(URI) supports reading multiple files in parallel but only with a 
 wildcard. I need to walk a dir tree, match a regex to create a list of files, 
 then I’d like to read them into a single RDD in parallel. I understand these 
 could go into separate RDDs then a union RDD can be created. Is there a way 
 to create a single RDD from a URI list?
 
 


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.



Read from list of files in parallel

2014-04-28 Thread Pat Ferrel
Warning noob question:

The sc.textFile(URI) method seems to support reading from files in parallel but 
you have to supply some wildcard URI, which greatly limits how the storage is 
structured. Is there a simple way to pass in a URI list or is it an exercise 
left for the student?