Re: Issue with zip and partitions

2014-04-02 Thread Xiangrui Meng
From API docs: Zips this RDD with another one, returning key-value
pairs with the first element in each RDD, second element in each RDD,
etc. Assumes that the two RDDs have the *same number of partitions*
and the *same number of elements in each partition* (e.g. one was made
through a map on the other).

Basically, one RDD should be a mapped RDD of the other, or both RDDs
are mapped RDDs of the same RDD.

Btw, your message says Dell - Internal Use - Confidential...

Best,
Xiangrui

On Tue, Apr 1, 2014 at 7:27 PM,  patrick_nico...@dell.com wrote:
 Dell - Internal Use - Confidential

 I got an exception can't zip RDDs with unusual numbers of Partitions when
 I apply any action (reduce, collect) of dataset created by zipping two
 dataset of 10 million entries each.  The problem occurs independently of the
 number of partitions or when I let Spark creates those partitions.



 Interestingly enough, I do not have problem zipping datasets of 1 and 2.5
 million entries.

 A similar problem was reported on this board with 0.8 but remember if the
 problem was fixed.



 Any idea? Any workaround?



 I appreciate.


Re: Using ProtoBuf 2.5 for messages with Spark Streaming

2014-04-02 Thread Patrick Wendell
It's this: mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean package


On Tue, Apr 1, 2014 at 5:15 PM, Vipul Pandey vipan...@gmail.com wrote:

 how do you recommend building that - it says
 ERROR] Failed to execute goal
 org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:assembly
 (default-cli) on project spark-0.9.0-incubating: Error reading assemblies:
 No assembly descriptors found. - [Help 1]
 upon runnning
 mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean assembly:assembly


 On Apr 1, 2014, at 4:13 PM, Patrick Wendell pwend...@gmail.com wrote:

 Do you get the same problem if you build with maven?


 On Tue, Apr 1, 2014 at 12:23 PM, Vipul Pandey vipan...@gmail.com wrote:

 SPARK_HADOOP_VERSION=2.0.0-cdh4.2.1 sbt/sbt assembly

 That's all I do.

 On Apr 1, 2014, at 11:41 AM, Patrick Wendell pwend...@gmail.com wrote:

 Vidal - could you show exactly what flags/commands you are using when you
 build spark to produce this assembly?


 On Tue, Apr 1, 2014 at 12:53 AM, Vipul Pandey vipan...@gmail.com wrote:

 Spark now shades its own protobuf dependency so protobuf 2.4.1 should't
 be getting pulled in unless you are directly using akka yourself. Are you?

 No i'm not. Although I see that protobuf libraries are directly pulled
 into the 0.9.0 assembly jar - I do see the shaded version as well.
 e.g. below for Message.class

 -bash-4.1$ jar -ftv
 ./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.0-cdh4.2.1.jar
 | grep protobuf | grep /Message.class
478 Thu Jun 30 15:26:12 PDT 2011 com/google/protobuf/Message.class
508 Sat Dec 14 14:20:38 PST 2013
 com/google/protobuf_spark/Message.class


 Does your project have other dependencies that might be indirectly
 pulling in protobuf 2.4.1? It would be helpful if you could list all of
 your dependencies including the exact Spark version and other libraries.

 I did have another one which I moved to the end of classpath - even ran
 partial code without that dependency but it still failed whenever I use the
 jar with ScalaBuf dependency.
 Spark version is 0.9.0


 ~Vipul

 On Mar 31, 2014, at 4:51 PM, Patrick Wendell pwend...@gmail.com wrote:

 Spark now shades its own protobuf dependency so protobuf 2.4.1 should't
 be getting pulled in unless you are directly using akka yourself. Are you?

 Does your project have other dependencies that might be indirectly
 pulling in protobuf 2.4.1? It would be helpful if you could list all of
 your dependencies including the exact Spark version and other libraries.

 - Patrick


 On Sun, Mar 30, 2014 at 10:03 PM, Vipul Pandey vipan...@gmail.comwrote:

 I'm using ScalaBuff (which depends on protobuf2.5) and facing the same
 issue. any word on this one?
 On Mar 27, 2014, at 6:41 PM, Kanwaldeep kanwal...@gmail.com wrote:

  We are using Protocol Buffer 2.5 to send messages to Spark Streaming
 0.9 with
  Kafka stream setup. I have protocol Buffer 2.5 part of the uber jar
 deployed
  on each of the spark worker nodes.
  The message is compiled using 2.5 but then on runtime it is being
  de-serialized by 2.4.1 as I'm getting the following exception
 
  java.lang.VerifyError (java.lang.VerifyError: class
  com.snc.sinet.messages.XServerMessage$XServer overrides final method
  getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;)
  java.lang.ClassLoader.defineClass1(Native Method)
  java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
  java.lang.ClassLoader.defineClass(ClassLoader.java:615)
 
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
 
  Suggestions on how I could still use ProtoBuf 2.5. Based on the
 article -
  https://spark-project.atlassian.net/browse/SPARK-995 we should be
 able to
  use different version of protobuf in the application.
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-ProtoBuf-2-5-for-messages-with-Spark-Streaming-tp3396.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com http://nabble.com/.










Re: How to index each map operation????

2014-04-02 Thread Shixiong Zhu
Hi Thierry,

Your code does not work if @yh18190 wants a global counter. A RDD may have
more than one partition. For each partition, cnt will be reset to -1. You
can try the following code:

scala val rdd = sc.parallelize( (1, 'a') :: (2, 'b') :: (3, 'c') :: (4,
'd') :: Nil)
rdd: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[3] at
parallelize at console:12

scala import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala val rdd2 = rdd.partitionBy(new HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ShuffledRDD[4] at partitionBy
at console:18

scala var cnt = -1
cnt: Int = -1

scala val rdd3 = rdd2.map(i = {cnt+=1;  (cnt,i)} )
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Char))] = MappedRDD[5] at map at
console:22

scala rdd3.collect
res2: Array[(Int, (Int, Char))] = Array((0,(2,b)), (1,(4,d)), (0,(1,a)),
(1,(3,c)))

A proper solution is using rdd.partitionBy(new HashPartitioner(1)) to
make sure there is only one partition. But that's not efficient for big
input.

Best Regards,
Shixiong Zhu


2014-04-02 11:10 GMT+08:00 Thierry Herrmann thierry.herrm...@gmail.com:

 I'm new to Spark, but isn't this a pure scala question ?

 The following seems to work with the spark shell:

 $ spark-shell

 scala val rdd = sc.makeRDD(List(10,20,30))
 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at
 console:12

 scala var cnt = -1
 cnt: Int = -1

 scala val rdd2 = rdd.map(i = {cnt+=1;  (cnt,i)} )
 rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[9] at map at
 console:16

 scala rdd2.collect
 res8: Array[(Int, Int)] = Array((0,10), (1,20), (2,30))

 Thierry



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3614.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: possible bug in Spark's ALS implementation...

2014-04-02 Thread Debasish Das
I think multiply by ratings is a heuristic that worked on rating related
problems like netflix dataset or any other ratings datasets but the scope
of NMF is much more broad than that

@Sean please correct me in case you don't agree...

Definitely it's good to add all the rating dataset related optimizations
but it should not be in the core algorithm but say RatingALS which extends
upon core ALS...



On Wed, Apr 2, 2014 at 12:06 AM, Michael Allman m...@allman.ms wrote:

 Hi Nick,

 I don't have my spark clone in front of me, but OTOH the major differences
 are/were:

 1. Oryx multiplies lambda by alpha.

 2. Oryx uses a different matrix inverse algorithm. It maintains a certain
 symmetry which the Spark algo does not, however I don't think this
 difference has a real impact on the results.

 3. Oryx supports the specification of a convergence threshold for
 termination of the algorithm, based on delta rmse on a subset of the
 training set if I recall correctly. I've been using that as the
 termination criterion instead of a fixed number of iterations.

 4. Oryx uses the weighted regularization scheme you alluded to below,
 multiplying lambda by the number of ratings.

 I've patched the spark impl to support (4) but haven't pushed it to my
 clone on github. I think it would be a valuable feature to support
 officially. I'd also like to work on (3) but don't have time now. I've
 only been using Oryx the past couple of weeks.

 Cheers,

 Michael

 On Tue, 1 Apr 2014, Nick Pentreath [via Apache Spark User List] wrote:

  Hi Michael
  Would you mind setting out exactly what differences you did find between
 the
  Spark and Oryx implementations? Would be good to be clear on them, and
 also
  see if there are further tricks/enhancements from the Oryx one that can
 be
  ported (such as the lambda * numRatings adjustment).
 
  N
 
 
  On Sat, Mar 15, 2014 at 2:52 AM, Michael Allman [hidden email] wrote:
I've been thoroughly investigating this issue over the past
couple of days
and have discovered quite a bit. For one thing, there is
definitely (at
least) one issue/bug in the Spark implementation that leads to
incorrect
results for models generated with rank  1 or a large number of
iterations.
I will post a bug report with a thorough explanation this
weekend or on
Monday.
 
I believe I've been able to track down every difference between
the Spark
and Oryx implementations that lead to difference results. I made
some
adjustments to the spark implementation so that, given the same
initial
product/item vectors, the resulting model is identical to the
one produced
by Oryx within a small numerical tolerance. I've verified this
for small
data sets and am working on verifying this with some large data
sets.
 
Aside from those already identified in this thread, another
significant
difference in the Spark implementation is that it begins the
factorization
process by computing the product matrix (Y) from the initial
user matrix
(X). Both of the papers on ALS referred to in this thread begin
the process
by computing the user matrix. I haven't done any testing
comparing the
models generated starting from Y or X, but they are very
different. Is there
a reason Spark begins the iteration by computing Y?
 
Initializing both X and Y as is done in the Spark implementation
seems
unnecessary unless I'm overlooking some desired side-effect.
Only the factor
matrix which generates the other in the first iteration needs to
be
initialized.
 
I also found that the product and user RDDs were being rebuilt
many times
over in my tests, even for tiny data sets. By persisting the RDD
returned
from updateFeatures() I was able to avoid a raft of duplicate
computations.
Is there a reason not to do this?
 
Thanks.
 
 
 
--
View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s
-ALS-implementation-tp2567p2704.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.
 
 
 
 

  If you reply to this email, your message will be added to the discussion
  below:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s
  -ALS-implementation-tp2567p3588.html
  To unsubscribe from possible bug in Spark's ALS implementation..., click
  here. NAML
 
 

 --
 View this message in context: Re: possible bug in Spark's ALS
 

Re: Using ProtoBuf 2.5 for messages with Spark Streaming

2014-04-02 Thread Vipul Pandey
I downloaded 0.9.0 fresh and ran the mvn command - the assembly jar thus 
generated also has both shaded and real version of protobuf classes

Vipuls-MacBook-Pro-3:spark-0.9.0-incubating vipul$ jar -ftv 
./assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.0.0-cdh4.2.1.jar
 | grep proto | grep /Message
  1190 Wed Apr 02 00:19:56 PDT 2014 
com/google/protobuf_spark/MessageOrBuilder.class
  2913 Wed Apr 02 00:19:56 PDT 2014 
com/google/protobuf_spark/Message$Builder.class
   704 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/MessageLite.class
  1904 Wed Apr 02 00:19:56 PDT 2014 
com/google/protobuf_spark/MessageLite$Builder.class
   257 Wed Apr 02 00:19:56 PDT 2014 
com/google/protobuf_spark/MessageLiteOrBuilder.class
   508 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/Message.class
  2661 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/Message$Builder.class
   478 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/Message.class
  1748 Wed Apr 02 00:20:00 PDT 2014 
com/google/protobuf/MessageLite$Builder.class
   668 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/MessageLite.class
   245 Wed Apr 02 00:20:00 PDT 2014 
com/google/protobuf/MessageLiteOrBuilder.class
  1112 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/MessageOrBuilder.class





On Apr 1, 2014, at 11:44 PM, Patrick Wendell pwend...@gmail.com wrote:

 It's this: mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean package
 
 
 On Tue, Apr 1, 2014 at 5:15 PM, Vipul Pandey vipan...@gmail.com wrote:
 how do you recommend building that - it says 
 ERROR] Failed to execute goal 
 org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:assembly 
 (default-cli) on project spark-0.9.0-incubating: Error reading assemblies: No 
 assembly descriptors found. - [Help 1]
 upon runnning 
 mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean assembly:assembly
 
 
 On Apr 1, 2014, at 4:13 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Do you get the same problem if you build with maven?
 
 
 On Tue, Apr 1, 2014 at 12:23 PM, Vipul Pandey vipan...@gmail.com wrote:
 SPARK_HADOOP_VERSION=2.0.0-cdh4.2.1 sbt/sbt assembly 
 
 That's all I do. 
 
 On Apr 1, 2014, at 11:41 AM, Patrick Wendell pwend...@gmail.com wrote:
 
 Vidal - could you show exactly what flags/commands you are using when you 
 build spark to produce this assembly?
 
 
 On Tue, Apr 1, 2014 at 12:53 AM, Vipul Pandey vipan...@gmail.com wrote:
 Spark now shades its own protobuf dependency so protobuf 2.4.1 should't be 
 getting pulled in unless you are directly using akka yourself. Are you?
 
 No i'm not. Although I see that protobuf libraries are directly pulled into 
 the 0.9.0 assembly jar - I do see the shaded version as well. 
 e.g. below for Message.class
 
 -bash-4.1$ jar -ftv 
 ./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.0-cdh4.2.1.jar
  | grep protobuf | grep /Message.class
478 Thu Jun 30 15:26:12 PDT 2011 com/google/protobuf/Message.class
508 Sat Dec 14 14:20:38 PST 2013 com/google/protobuf_spark/Message.class
 
 
 Does your project have other dependencies that might be indirectly pulling 
 in protobuf 2.4.1? It would be helpful if you could list all of your 
 dependencies including the exact Spark version and other libraries.
 
 I did have another one which I moved to the end of classpath - even ran 
 partial code without that dependency but it still failed whenever I use the 
 jar with ScalaBuf dependency. 
 Spark version is 0.9.0
 
 
 ~Vipul
 
 On Mar 31, 2014, at 4:51 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Spark now shades its own protobuf dependency so protobuf 2.4.1 should't be 
 getting pulled in unless you are directly using akka yourself. Are you?
 
 Does your project have other dependencies that might be indirectly pulling 
 in protobuf 2.4.1? It would be helpful if you could list all of your 
 dependencies including the exact Spark version and other libraries.
 
 - Patrick
 
 
 On Sun, Mar 30, 2014 at 10:03 PM, Vipul Pandey vipan...@gmail.com wrote:
 I'm using ScalaBuff (which depends on protobuf2.5) and facing the same 
 issue. any word on this one?
 On Mar 27, 2014, at 6:41 PM, Kanwaldeep kanwal...@gmail.com wrote:
 
  We are using Protocol Buffer 2.5 to send messages to Spark Streaming 0.9 
  with
  Kafka stream setup. I have protocol Buffer 2.5 part of the uber jar 
  deployed
  on each of the spark worker nodes.
  The message is compiled using 2.5 but then on runtime it is being
  de-serialized by 2.4.1 as I'm getting the following exception
 
  java.lang.VerifyError (java.lang.VerifyError: class
  com.snc.sinet.messages.XServerMessage$XServer overrides final method
  getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;)
  java.lang.ClassLoader.defineClass1(Native Method)
  java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
  java.lang.ClassLoader.defineClass(ClassLoader.java:615)
  java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
 
  Suggestions on 

Re: How to index each map operation????

2014-04-02 Thread yh18190
Hi Therry,

Thanks for the above responses..I implemented using RangePartitioner..we
need to use any of the custom partitioners in orderto perform this
task..Normally u cant maintain a counter becoz count operations should
beperformed on each partitioned block ofdata...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3624.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


CDH5 Spark on EC2

2014-04-02 Thread Denny Lee
I’ve been able to get CDH5 up and running on EC2 and according to Cloudera 
Manager, Spark is running healthy.

But when I try to run spark-shell, I eventually get the error:

14/04/02 07:18:18 INFO client.AppClient$ClientActor: Connecting to master 
spark://ip-172-xxx-xxx-xxx:7077...
14/04/02 07:18:38 ERROR client.AppClient$ClientActor: All masters are 
unresponsive! Giving up.
14/04/02 07:18:38 ERROR cluster.SparkDeploySchedulerBackend: Spark cluster 
looks dead, giving up.
14/04/02 07:18:38 ERROR scheduler.TaskSchedulerImpl: Exiting due to error from 
cluster scheduler: Spark cluster looks down

Wondering which configurations I would need to change to get this to work?

Thanks!
Denny

Re: Status of MLI?

2014-04-02 Thread Krakna H
Thanks for the update Evan! In terms of using MLI, I see that the Github
code is linked to Spark 0.8; will it not work with 0.9 (which is what I
have set up) or higher versions?


On Wed, Apr 2, 2014 at 1:44 AM, Evan R. Sparks [via Apache Spark User List]
ml-node+s1001560n3615...@n3.nabble.com wrote:

 Hi there,

 MLlib is the first component of MLbase - MLI and the higher levels of the
 stack are still being developed. Look for updates in terms of our progress
 on the hyperparameter tuning/model selection problem in the next month or
 so!

 - Evan


 On Tue, Apr 1, 2014 at 8:05 PM, Krakna H [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=3615i=0
  wrote:

 Hi Nan,

 I was actually referring to MLI/MLBase (http://www.mlbase.org); is this
 being actively developed?

 I'm familiar with mllib and have been looking at its documentation.

 Thanks!


 On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] 
 [hidden
 email] http://user/SendEmail.jtp?type=nodenode=3612i=0 wrote:

  mllib has been part of Spark distribution (under mllib directory), also
 check http://spark.apache.org/docs/latest/mllib-guide.html

 and for JIRA, because of the recent migration to apache JIRA, I think
 all mllib-related issues should be under the Spark umbrella,
 https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel

 --
 Nan Zhu

 On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote:

 What is the current development status of MLI/MLBase? I see that the
 github repo is lying dormant (https://github.com/amplab/MLI) and JIRA
 has had no activity in the last 30 days (
 https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
 Is the plan to add a lot of this into mllib itself without needing a
 separate API?

 Thanks!

 --
 View this message in context: Status of 
 MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at
 Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html
  To start a new topic under Apache Spark User List, email [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=3612i=1
 To unsubscribe from Apache Spark User List, click here.
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Status of 
 MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html

 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3615.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click 
 herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=c2hhbmthcmsrc3lzQGdtYWlsLmNvbXwxfDk3NjU5Mzg0
 .
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: possible bug in Spark's ALS implementation...

2014-04-02 Thread Sean Owen
It should be kept in mind that different implementations are rarely
strictly better, and that what works well in one type of data might
not in another. It also bears keeping in mind that several of these
differences just amount to different amounts of regularization, which
need not be a difference.

#1 is just a question of 'semantics' really and not an algorithmic difference.

#4 is more subtle and I think it is a small positive to use weighted
regularization -- for all types of data. It is not specific to
explicit data since it is based on counts. That said the effect of
this is mostly more regularization across the boards, and can be
simulated with a higher global lambda.

There might be something to #2. I suspect the QR decomposition is more
accurate and that can matter here. I have no evidence for that though.
(The symmetry wasn't actually an issue in the end no?)

#3 won't affect the result so much as when to stop. That is stopping
in a principled way after X iterations is good, but it can be
simulated with just specifying X iterations.

If I were to experiment next I would focus on #2 and maybe #4.
--
Sean Owen | Director, Data Science | London


On Wed, Apr 2, 2014 at 9:06 AM, Michael Allman m...@allman.ms wrote:
 Hi Nick,

 I don't have my spark clone in front of me, but OTOH the major differences
 are/were:

 1. Oryx multiplies lambda by alpha.

 2. Oryx uses a different matrix inverse algorithm. It maintains a certain
 symmetry which the Spark algo does not, however I don't think this
 difference has a real impact on the results.

 3. Oryx supports the specification of a convergence threshold for
 termination of the algorithm, based on delta rmse on a subset of the
 training set if I recall correctly. I've been using that as the
 termination criterion instead of a fixed number of iterations.

 4. Oryx uses the weighted regularization scheme you alluded to below,
 multiplying lambda by the number of ratings.

 I've patched the spark impl to support (4) but haven't pushed it to my
 clone on github. I think it would be a valuable feature to support
 officially. I'd also like to work on (3) but don't have time now. I've
 only been using Oryx the past couple of weeks.

 Cheers,

 Michael

 On Tue, 1 Apr 2014, Nick Pentreath [via Apache Spark User List] wrote:

 Hi Michael
 Would you mind setting out exactly what differences you did find between
 the
 Spark and Oryx implementations? Would be good to be clear on them, and
 also
 see if there are further tricks/enhancements from the Oryx one that can be
 ported (such as the lambda * numRatings adjustment).

 N


 On Sat, Mar 15, 2014 at 2:52 AM, Michael Allman [hidden email] wrote:
   I've been thoroughly investigating this issue over the past
   couple of days
   and have discovered quite a bit. For one thing, there is
   definitely (at
   least) one issue/bug in the Spark implementation that leads to
   incorrect
   results for models generated with rank  1 or a large number of
   iterations.
   I will post a bug report with a thorough explanation this
   weekend or on
   Monday.

   I believe I've been able to track down every difference between
   the Spark
   and Oryx implementations that lead to difference results. I made
   some
   adjustments to the spark implementation so that, given the same
   initial
   product/item vectors, the resulting model is identical to the
   one produced
   by Oryx within a small numerical tolerance. I've verified this
   for small
   data sets and am working on verifying this with some large data
   sets.

   Aside from those already identified in this thread, another
   significant
   difference in the Spark implementation is that it begins the
   factorization
   process by computing the product matrix (Y) from the initial
   user matrix
   (X). Both of the papers on ALS referred to in this thread begin
   the process
   by computing the user matrix. I haven't done any testing
   comparing the
   models generated starting from Y or X, but they are very
   different. Is there
   a reason Spark begins the iteration by computing Y?

   Initializing both X and Y as is done in the Spark implementation
   seems
   unnecessary unless I'm overlooking some desired side-effect.
   Only the factor
   matrix which generates the other in the first iteration needs to
   be
   initialized.

   I also found that the product and user RDDs were being rebuilt
   many times
   over in my tests, even for tiny data sets. By persisting the RDD
   returned
   from updateFeatures() I was able to avoid a raft of duplicate
   computations.
   Is there a reason not to do this?

   Thanks.



   --
   View this message in
 context:http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s
   

ActorNotFound problem for mesos driver

2014-04-02 Thread Leon Zhang
Hi, Spark Devs:

I encounter a problem which shows error message akka.actor.ActorNotFound
on our mesos mini-cluster.

mesos : 0.17.0
spark : spark-0.9.0-incubating

spark-env.sh:
#!/usr/bin/env bash

export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export SPARK_EXECUTOR_URI=hdfs://
192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz
export MASTER=zk://192.168.1.20:2181/mesos
export SPARK_JAVA_OPTS=-Dspark.driver.port=17077

And the logs from each slave looks like:

14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties
14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as
executor ID 201403301937-335653056-5050-991-1
14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started
14/04/02 15:14:38 INFO Remoting: Starting remoting
14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@zetyun-cloud3:42218]
14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@zetyun-cloud3:42218]
14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@localhost:17077/user/BlockManagerMaster
akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Actor[akka.tcp://spark@localhost
:17077/]/user/BlockManagerMaster]
at
akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
at
akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
at akka.actor.ActorRef.tell(ActorRef.scala:125)
at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44)
at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
at
akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread Thread-0

Any clue for this problem?

Thanks in advance.


Re: ActorNotFound problem for mesos driver

2014-04-02 Thread andy petrella
Heya,

Yep this is a problem in the Mesos scheduler implementation that has been
fixed after 0.9.0 (https://spark-project.atlassian.net/browse/SPARK-1052 =
MesosSchedulerBackend)

So several options, like applying the patch, upgrading to 0.9.1 :-/

Cheers,
Andy


On Wed, Apr 2, 2014 at 5:30 PM, Leon Zhang leonca...@gmail.com wrote:

 Hi, Spark Devs:

 I encounter a problem which shows error message akka.actor.ActorNotFound
 on our mesos mini-cluster.

 mesos : 0.17.0
 spark : spark-0.9.0-incubating

 spark-env.sh:
 #!/usr/bin/env bash

 export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
 export SPARK_EXECUTOR_URI=hdfs://
 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz
 export MASTER=zk://192.168.1.20:2181/mesos
 export SPARK_JAVA_OPTS=-Dspark.driver.port=17077

 And the logs from each slave looks like:

 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as
 executor ID 201403301937-335653056-5050-991-1
 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started
 14/04/02 15:14:38 INFO Remoting: Starting remoting
 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@zetyun-cloud3:42218]
 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@zetyun-cloud3:42218]
 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://spark@localhost:17077/user/BlockManagerMaster
 akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Actor[akka.tcp://spark@localhost
 :17077/]/user/BlockManagerMaster]
 at
 akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
 at
 akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
 at akka.actor.ActorRef.tell(ActorRef.scala:125)
 at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44)
 at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
 at
 akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
 at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
 at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
 at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
 at
 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203)
 at
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
 at akka.actor.ActorCell.terminate(ActorCell.scala:338)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Exception in thread Thread-0

 Any clue for this problem?

 Thanks in advance.



Re: ActorNotFound problem for mesos driver

2014-04-02 Thread Leon Zhang
Aha, thank you for your kind reply.

Upgrading to 0.9.1 is a good choice. :)


On Wed, Apr 2, 2014 at 11:35 PM, andy petrella andy.petre...@gmail.comwrote:

 Heya,

 Yep this is a problem in the Mesos scheduler implementation that has been
 fixed after 0.9.0 (https://spark-project.atlassian.net/browse/SPARK-1052= 
 MesosSchedulerBackend)

 So several options, like applying the patch, upgrading to 0.9.1 :-/

 Cheers,
 Andy


 On Wed, Apr 2, 2014 at 5:30 PM, Leon Zhang leonca...@gmail.com wrote:

 Hi, Spark Devs:

 I encounter a problem which shows error message
 akka.actor.ActorNotFound on our mesos mini-cluster.

 mesos : 0.17.0
 spark : spark-0.9.0-incubating

 spark-env.sh:
 #!/usr/bin/env bash

 export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
 export SPARK_EXECUTOR_URI=hdfs://
 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz
 export MASTER=zk://192.168.1.20:2181/mesos
 export SPARK_JAVA_OPTS=-Dspark.driver.port=17077

 And the logs from each slave looks like:

 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as
 executor ID 201403301937-335653056-5050-991-1
 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started
 14/04/02 15:14:38 INFO Remoting: Starting remoting
 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@zetyun-cloud3:42218]
 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@zetyun-cloud3:42218]
 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://spark@localhost:17077/user/BlockManagerMaster
 akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Actor[akka.tcp://spark@localhost
 :17077/]/user/BlockManagerMaster]
 at
 akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
 at
 akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
 at akka.actor.ActorRef.tell(ActorRef.scala:125)
 at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44)
 at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
 at
 akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
 at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
 at
 akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
 at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
 at
 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203)
 at
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
 at akka.actor.ActorCell.terminate(ActorCell.scala:338)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Exception in thread Thread-0

 Any clue for this problem?

 Thanks in advance.





Re: ActorNotFound problem for mesos driver

2014-04-02 Thread andy petrella
np ;-)


On Wed, Apr 2, 2014 at 5:50 PM, Leon Zhang leonca...@gmail.com wrote:

 Aha, thank you for your kind reply.

 Upgrading to 0.9.1 is a good choice. :)


 On Wed, Apr 2, 2014 at 11:35 PM, andy petrella andy.petre...@gmail.comwrote:

 Heya,

 Yep this is a problem in the Mesos scheduler implementation that has been
 fixed after 0.9.0 (https://spark-project.atlassian.net/browse/SPARK-1052= 
 MesosSchedulerBackend)

 So several options, like applying the patch, upgrading to 0.9.1 :-/

 Cheers,
 Andy


 On Wed, Apr 2, 2014 at 5:30 PM, Leon Zhang leonca...@gmail.com wrote:

 Hi, Spark Devs:

 I encounter a problem which shows error message
 akka.actor.ActorNotFound on our mesos mini-cluster.

 mesos : 0.17.0
 spark : spark-0.9.0-incubating

 spark-env.sh:
 #!/usr/bin/env bash

 export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
 export SPARK_EXECUTOR_URI=hdfs://
 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz
 export MASTER=zk://192.168.1.20:2181/mesos
 export SPARK_JAVA_OPTS=-Dspark.driver.port=17077

 And the logs from each slave looks like:

 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j
 profile: org/apache/spark/log4j-defaults.properties
 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as
 executor ID 201403301937-335653056-5050-991-1
 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started
 14/04/02 15:14:38 INFO Remoting: Starting remoting
 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on
 addresses :[akka.tcp://spark@zetyun-cloud3:42218]
 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@zetyun-cloud3:42218]
 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://spark@localhost:17077/user/BlockManagerMaster
 akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Actor[akka.tcp://spark@localhost
 :17077/]/user/BlockManagerMaster]
 at
 akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
 at
 akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
 at akka.actor.ActorRef.tell(ActorRef.scala:125)
 at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44)
 at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
 at
 akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
 at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
 at
 akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
 at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
 at
 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203)
 at
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
 at akka.actor.ActorCell.terminate(ActorCell.scala:338)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Exception in thread Thread-0

 Any clue for this problem?

 Thanks in advance.






Resilient nature of RDD

2014-04-02 Thread David Thomas
Can someone explain how RDD is resilient? If one of the partition is lost,
who is responsible to recreate that partition - is it the driver program?


Print line in JavaNetworkWordCount

2014-04-02 Thread Eduardo Costa Alfaia

Hi Guys

I would like printing the content inside of line in :

JavaDStreamString lines = ssc.socketTextStream(args[1], 
Integer.parseInt(args[2]));
JavaDStreamString words = lines.flatMap(new 
FlatMapFunctionString, String() {

  @Override
  public IterableString call(String x) {
return Lists.newArrayList(x.split( ));
  }
});

Is it possible? How could I do?

Thanks

--
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Need suggestions

2014-04-02 Thread andy petrella
TL;DR
Your classes are missing on the workers, pass the jar containing the
class main.scala.Utils to the SparkContext

Longer:
I miss some information, like how the SparkContext is configured but my
best guess is that you didn't provided the jars (addJars on SparkConf or
use the SC's constructor param).

Actually, the classes are not found on the slave which is another node,
another machine (or env), and so on. So to run your class it must be able
to load it -- which is handle by Spark but you simply have to pass it as an
argument...

This jar could be simply your current project packaged as a jar using
maven/sbt/...

HTH


On Wed, Apr 2, 2014 at 10:01 PM, yh18190 yh18...@gmail.com wrote:

 Hi Guys,

 Currently I am facing this issue ..Not able to find erros..
 here is sbt file.
 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 resolvers += bintray/meetup at http://dl.bintray.com/meetup/maven;

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Cloudera Repository at
 https://repository.cloudera.com/artifactory/cloudera-repos/;

 libraryDependencies += org.apache.spark %% spark-core %
 0.9.0-incubating

 libraryDependencies += com.cloudphysics % jerkson_2.10 % 0.6.3

 libraryDependencies += org.apache.hadoop % hadoop-client %
 2.0.0-mr1-cdh4.6.0

 retrieveManaged := true

 output..

 [error] (run-main) org.apache.spark.SparkException: Job aborted: Task 2.0:2
 failed 4 times (most recent failure: Exception failure:
 java.lang.NoClassDefFoundError: Could not initialize class
 main.scala.Utils$)
 org.apache.spark.SparkException: Job aborted: Task 2.0:2 failed 4 times
 (most recent failure: Exception failure: java.lang.NoClassDefFoundError:
 Could not initialize class main.scala.Utils$)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Need-suggestions-tp3650.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: PySpark RDD.partitionBy() requires an RDD of tuples

2014-04-02 Thread Nicholas Chammas
Update: I'm now using this ghetto function to partition the RDD I get back
when I call textFile() on a gzipped file:

# Python 2.6
def partitionRDD(rdd, numPartitions):
counter = {'a': 0}
def count_up(x):
counter['a'] += 1
return counter['a']
return (rdd.keyBy(count_up)
.partitionBy(numPartitions)
.map(lambda (counter, data): data))

If there's supposed to be a built-in Spark method to do this, I'd love to
learn more about it.

Nick


On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 Hmm, doing help(rdd) in PySpark doesn't show a method called repartition().
 Trying rdd.repartition() or rdd.repartition(10) also fail. I'm on 0.9.0.

 The approach I'm going with to partition my MappedRDD is to key it by a
 random int, and then partition it.

 So something like:

 rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition;
 minSplits is not actionable due to gzip
 keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we can
 partition it
 partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions

 Are you saying I don't have to do this?

 Nick



 On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.com wrote:

 Hm, yeah, the docs are not clear on this one. The function you're looking
 for to change the number of partitions on any ol' RDD is repartition(),
 which is available in master but for some reason doesn't seem to show up in
 the latest docs. Sorry about that, I also didn't realize partitionBy() had
 this behavior from reading the Python docs (though it is consistent with
 the Scala API, just more type-safe there).


 On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Just an FYI, it's not obvious from the 
 docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat
  the following code should fail:

 a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
 a._jrdd.splits().size()
 a.count()
 b = a.partitionBy(5)
 b._jrdd.splits().size()
 b.count()

 I figured out from the example that if I generated a key by doing this

 b = a.map(lambda x: (x, x)).partitionBy(5)

  then all would be well.

 In other words, partitionBy() only works on RDDs of tuples. Is that
 correct?

 Nick


 --
 View this message in context: PySpark RDD.partitionBy() requires an RDD
 of 
 tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.






Re: Need suggestions

2014-04-02 Thread andy petrella
Sorry I was not clear perhaps, anyway, could you try with the path in the
*List* to be the absolute one; e.g.
List(/home/yh/src/pj/spark-stuffs/target/scala-2.10/simple-project_2.10-1.0.jar)

In order to provide a relative path, you need first to figure out your CWD,
so you can do (to be really sure) do:
//before the new SparkContecxt)
println(new java.io.File(.).toUri.toString) // didn't try so adapt to
make scalax happy ^^




On Wed, Apr 2, 2014 at 11:30 PM, yh18190 yh18...@gmail.com wrote:

 Hi,
 Here is the sparkcontext feature.Do I need to any more extra jars to slaves
 separetely or this is enough?
 But i am able to see this created jar in my target directory..

  val sc = new SparkContext(spark://spark-master-001:7077, Simple App,
 utilclass.spark_home,
   List(target/scala-2.10/simple-project_2.10-1.0.jar))




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Need-suggestions-tp3650p3655.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark output compression on HDFS

2014-04-02 Thread Patrick Wendell
For textFile I believe we overload it and let you set a codec directly:

https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L59

For saveAsSequenceFile yep, I think Mark is right, you need an option.


On Wed, Apr 2, 2014 at 12:36 PM, Mark Hamstra m...@clearstorydata.comwrote:

 http://www.scala-lang.org/api/2.10.3/index.html#scala.Option

 The signature is 'def saveAsSequenceFile(path: String, codec:
 Option[Class[_ : CompressionCodec]] = None)', but you are providing a
 Class, not an Option[Class].

 Try counts.saveAsSequenceFile(output,
 Some(classOf[org.apache.hadoop.io.compress.SnappyCodec]))



 On Wed, Apr 2, 2014 at 12:18 PM, Kostiantyn Kudriavtsev 
 kudryavtsev.konstan...@gmail.com wrote:

 Hi there,


 I've started using Spark recently and evaluating possible use cases in
 our company.

 I'm trying to save RDD as compressed Sequence file. I'm able to save
 non-compressed file be calling:

 counts.saveAsSequenceFile(output)

 where counts is my RDD (IntWritable, Text). However, I didn't manage to
 compress output. I tried several configurations and always got exception:

 counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.SnappyCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.hadoop.io.compress.SnappyCodec](classOf[org.apache.hadoop.io.compress.SnappyCodec])
  required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.SnappyCodec])

  counts.saveAsSequenceFile(output, 
 classOf[org.apache.spark.io.SnappyCompressionCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.spark.io.SnappyCompressionCodec](classOf[org.apache.spark.io.SnappyCompressionCodec])
  required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.spark.io.SnappyCompressionCodec])

 and it doesn't work even for Gzip:

  counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.GzipCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.hadoop.io.compress.GzipCodec](classOf[org.apache.hadoop.io.compress.GzipCodec])
  required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.GzipCodec])

 Could you please suggest solution? also, I didn't find how is it possible
 to specify compression parameters (i.e. compression type for Snappy). I
 wondered if you could share code snippets for writing/reading RDD with
 compression?

 Thank you in advance,
 Konstantin Kudryavtsev





Optimal Server Design for Spark

2014-04-02 Thread Stephen Watt
Hi Folks

I'm looking to buy some gear to run Spark. I'm quite well versed in Hadoop 
Server design but there does not seem to be much Spark related collateral 
around infrastructure guidelines (or at least I haven't been able to find 
them). My current thinking for server design is something along these lines.

- 2 x 10Gbe NICs
- 128 GB RAM
- 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and Runtimes, 4 x 
1TB for Data Drives)
- 1 Disk Controller
- 2 x 2.6 GHz 6 core processors

If I stick with 1u servers then I lose disk capacity per rack but I get a lot 
more memory and CPU capacity per rack. This increases my total cluster memory 
footprint and it doesn't seem to make sense to have super dense storage servers 
because I can't fit all that data on disk in memory anyways. So at present, my 
thinking is to go with 1u servers instead of 2u Servers. Is 128GB RAM per 
server normal? Do you guys use more or less than that?

Any feedback would be appreciated

Regards
Steve Watt


Re: Spark output compression on HDFS

2014-04-02 Thread Nicholas Chammas
Is this a 
Scala-onlyhttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#saveAsTextFilefeature?


On Wed, Apr 2, 2014 at 5:55 PM, Patrick Wendell pwend...@gmail.com wrote:

 For textFile I believe we overload it and let you set a codec directly:


 https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L59

 For saveAsSequenceFile yep, I think Mark is right, you need an option.


 On Wed, Apr 2, 2014 at 12:36 PM, Mark Hamstra m...@clearstorydata.comwrote:

 http://www.scala-lang.org/api/2.10.3/index.html#scala.Option

 The signature is 'def saveAsSequenceFile(path: String, codec:
 Option[Class[_ : CompressionCodec]] = None)', but you are providing a
 Class, not an Option[Class].

 Try counts.saveAsSequenceFile(output,
 Some(classOf[org.apache.hadoop.io.compress.SnappyCodec]))



 On Wed, Apr 2, 2014 at 12:18 PM, Kostiantyn Kudriavtsev 
 kudryavtsev.konstan...@gmail.com wrote:

 Hi there,


 I've started using Spark recently and evaluating possible use cases in
 our company.

 I'm trying to save RDD as compressed Sequence file. I'm able to save
 non-compressed file be calling:

 counts.saveAsSequenceFile(output)

 where counts is my RDD (IntWritable, Text). However, I didn't manage to
 compress output. I tried several configurations and always got exception:

 counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.SnappyCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.hadoop.io.compress.SnappyCodec](classOf[org.apache.hadoop.io.compress.SnappyCodec])
  required: Option[Class[_ : 
 org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.SnappyCodec])

  counts.saveAsSequenceFile(output, 
 classOf[org.apache.spark.io.SnappyCompressionCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.spark.io.SnappyCompressionCodec](classOf[org.apache.spark.io.SnappyCompressionCodec])
  required: Option[Class[_ : 
 org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.spark.io.SnappyCompressionCodec])

 and it doesn't work even for Gzip:

  counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.GzipCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.hadoop.io.compress.GzipCodec](classOf[org.apache.hadoop.io.compress.GzipCodec])
  required: Option[Class[_ : 
 org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.GzipCodec])

 Could you please suggest solution? also, I didn't find how is it
 possible to specify compression parameters (i.e. compression type for
 Snappy). I wondered if you could share code snippets for writing/reading
 RDD with compression?

 Thank you in advance,
 Konstantin Kudryavtsev






Re: PySpark RDD.partitionBy() requires an RDD of tuples

2014-04-02 Thread Mark Hamstra
There is a repartition method in pyspark master:
https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1128


On Wed, Apr 2, 2014 at 2:44 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 Update: I'm now using this ghetto function to partition the RDD I get back
 when I call textFile() on a gzipped file:

 # Python 2.6
 def partitionRDD(rdd, numPartitions):
 counter = {'a': 0}
 def count_up(x):
 counter['a'] += 1
 return counter['a']
 return (rdd.keyBy(count_up)
 .partitionBy(numPartitions)
 .map(lambda (counter, data): data))

 If there's supposed to be a built-in Spark method to do this, I'd love to
 learn more about it.

 Nick


 On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Hmm, doing help(rdd) in PySpark doesn't show a method called
 repartition(). Trying rdd.repartition() or rdd.repartition(10) also
 fail. I'm on 0.9.0.

 The approach I'm going with to partition my MappedRDD is to key it by a
 random int, and then partition it.

 So something like:

 rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition;
 minSplits is not actionable due to gzip
 keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we
 can partition it
 partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions

 Are you saying I don't have to do this?

 Nick



 On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.comwrote:

 Hm, yeah, the docs are not clear on this one. The function you're
 looking for to change the number of partitions on any ol' RDD is
 repartition(), which is available in master but for some reason doesn't
 seem to show up in the latest docs. Sorry about that, I also didn't realize
 partitionBy() had this behavior from reading the Python docs (though it is
 consistent with the Scala API, just more type-safe there).


 On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Just an FYI, it's not obvious from the 
 docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat
  the following code should fail:

 a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
 a._jrdd.splits().size()
 a.count()
 b = a.partitionBy(5)
 b._jrdd.splits().size()
 b.count()

 I figured out from the example that if I generated a key by doing this

 b = a.map(lambda x: (x, x)).partitionBy(5)

  then all would be well.

 In other words, partitionBy() only works on RDDs of tuples. Is that
 correct?

 Nick


 --
 View this message in context: PySpark RDD.partitionBy() requires an
 RDD of 
 tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.







Re: Spark output compression on HDFS

2014-04-02 Thread Nicholas Chammas
Thanks for pointing that out.


On Wed, Apr 2, 2014 at 6:11 PM, Mark Hamstra m...@clearstorydata.comwrote:

 First, you shouldn't be using spark.incubator.apache.org anymore, just
 spark.apache.org.  Second, saveAsSequenceFile doesn't appear to exist in
 the Python API at this point.


 On Wed, Apr 2, 2014 at 3:00 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Is this a 
 Scala-onlyhttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#saveAsTextFilefeature?


 On Wed, Apr 2, 2014 at 5:55 PM, Patrick Wendell pwend...@gmail.comwrote:

 For textFile I believe we overload it and let you set a codec directly:


 https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L59

 For saveAsSequenceFile yep, I think Mark is right, you need an option.


 On Wed, Apr 2, 2014 at 12:36 PM, Mark Hamstra 
 m...@clearstorydata.comwrote:

 http://www.scala-lang.org/api/2.10.3/index.html#scala.Option

 The signature is 'def saveAsSequenceFile(path: String, codec:
 Option[Class[_ : CompressionCodec]] = None)', but you are providing a
 Class, not an Option[Class].

 Try counts.saveAsSequenceFile(output,
 Some(classOf[org.apache.hadoop.io.compress.SnappyCodec]))



 On Wed, Apr 2, 2014 at 12:18 PM, Kostiantyn Kudriavtsev 
 kudryavtsev.konstan...@gmail.com wrote:

 Hi there,


 I've started using Spark recently and evaluating possible use cases in
 our company.

 I'm trying to save RDD as compressed Sequence file. I'm able to save
 non-compressed file be calling:


 counts.saveAsSequenceFile(output)

 where counts is my RDD (IntWritable, Text). However, I didn't manage
 to compress output. I tried several configurations and always got 
 exception:


 counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.SnappyCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.hadoop.io.compress.SnappyCodec](classOf[org.apache.hadoop.io.compress.SnappyCodec])
  required: Option[Class[_ : 
 org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.SnappyCodec])

  counts.saveAsSequenceFile(output, 
 classOf[org.apache.spark.io.SnappyCompressionCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.spark.io.SnappyCompressionCodec](classOf[org.apache.spark.io.SnappyCompressionCodec])
  required: Option[Class[_ : 
 org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.spark.io.SnappyCompressionCodec])

 and it doesn't work even for Gzip:


  counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.GzipCodec])
 console:21: error: type mismatch;
  found   : 
 Class[org.apache.hadoop.io.compress.GzipCodec](classOf[org.apache.hadoop.io.compress.GzipCodec])
  required: Option[Class[_ : 
 org.apache.hadoop.io.compress.CompressionCodec]]
   counts.saveAsSequenceFile(output, 
 classOf[org.apache.hadoop.io.compress.GzipCodec])

 Could you please suggest solution? also, I didn't find how is it
 possible to specify compression parameters (i.e. compression type for
 Snappy). I wondered if you could share code snippets for writing/reading
 RDD with compression?

 Thank you in advance,
 Konstantin Kudryavtsev








Re: PySpark RDD.partitionBy() requires an RDD of tuples

2014-04-02 Thread Nicholas Chammas
Ah, now I see what Aaron was referring to. So I'm guessing we will get this
in the next release or two. Thank you.


On Wed, Apr 2, 2014 at 6:09 PM, Mark Hamstra m...@clearstorydata.comwrote:

 There is a repartition method in pyspark master:
 https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1128


 On Wed, Apr 2, 2014 at 2:44 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Update: I'm now using this ghetto function to partition the RDD I get
 back when I call textFile() on a gzipped file:

 # Python 2.6
 def partitionRDD(rdd, numPartitions):
 counter = {'a': 0}
 def count_up(x):
 counter['a'] += 1
 return counter['a']
 return (rdd.keyBy(count_up)
 .partitionBy(numPartitions)
 .map(lambda (counter, data): data))

 If there's supposed to be a built-in Spark method to do this, I'd love to
 learn more about it.

 Nick


 On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Hmm, doing help(rdd) in PySpark doesn't show a method called
 repartition(). Trying rdd.repartition() or rdd.repartition(10) also
 fail. I'm on 0.9.0.

 The approach I'm going with to partition my MappedRDD is to key it by a
 random int, and then partition it.

 So something like:

 rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition;
 minSplits is not actionable due to gzip
 keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we
 can partition it
 partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions

 Are you saying I don't have to do this?

 Nick



 On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.comwrote:

 Hm, yeah, the docs are not clear on this one. The function you're
 looking for to change the number of partitions on any ol' RDD is
 repartition(), which is available in master but for some reason doesn't
 seem to show up in the latest docs. Sorry about that, I also didn't realize
 partitionBy() had this behavior from reading the Python docs (though it is
 consistent with the Scala API, just more type-safe there).


 On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Just an FYI, it's not obvious from the 
 docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat
  the following code should fail:

 a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
 a._jrdd.splits().size()
 a.count()
 b = a.partitionBy(5)
 b._jrdd.splits().size()
 b.count()

 I figured out from the example that if I generated a key by doing this

 b = a.map(lambda x: (x, x)).partitionBy(5)

  then all would be well.

 In other words, partitionBy() only works on RDDs of tuples. Is that
 correct?

 Nick


 --
 View this message in context: PySpark RDD.partitionBy() requires an
 RDD of 
 tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at 
 Nabble.com.








Re: PySpark RDD.partitionBy() requires an RDD of tuples

2014-04-02 Thread Mark Hamstra
Will be in 1.0.0


On Wed, Apr 2, 2014 at 3:22 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 Ah, now I see what Aaron was referring to. So I'm guessing we will get
 this in the next release or two. Thank you.



 On Wed, Apr 2, 2014 at 6:09 PM, Mark Hamstra m...@clearstorydata.comwrote:

 There is a repartition method in pyspark master:
 https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1128


 On Wed, Apr 2, 2014 at 2:44 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Update: I'm now using this ghetto function to partition the RDD I get
 back when I call textFile() on a gzipped file:

 # Python 2.6
 def partitionRDD(rdd, numPartitions):
 counter = {'a': 0}
 def count_up(x):
 counter['a'] += 1
 return counter['a']
 return (rdd.keyBy(count_up)
 .partitionBy(numPartitions)
 .map(lambda (counter, data): data))

 If there's supposed to be a built-in Spark method to do this, I'd love
 to learn more about it.

 Nick


 On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Hmm, doing help(rdd) in PySpark doesn't show a method called
 repartition(). Trying rdd.repartition() or rdd.repartition(10) also
 fail. I'm on 0.9.0.

 The approach I'm going with to partition my MappedRDD is to key it by a
 random int, and then partition it.

 So something like:

 rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition;
 minSplits is not actionable due to gzip
 keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we
 can partition it
 partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions

 Are you saying I don't have to do this?

 Nick



 On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.comwrote:

 Hm, yeah, the docs are not clear on this one. The function you're
 looking for to change the number of partitions on any ol' RDD is
 repartition(), which is available in master but for some reason doesn't
 seem to show up in the latest docs. Sorry about that, I also didn't 
 realize
 partitionBy() had this behavior from reading the Python docs (though it is
 consistent with the Scala API, just more type-safe there).


 On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Just an FYI, it's not obvious from the 
 docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat
  the following code should fail:

 a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
 a._jrdd.splits().size()
 a.count()
 b = a.partitionBy(5)
 b._jrdd.splits().size()
 b.count()

 I figured out from the example that if I generated a key by doing this

 b = a.map(lambda x: (x, x)).partitionBy(5)

  then all would be well.

 In other words, partitionBy() only works on RDDs of tuples. Is that
 correct?

 Nick


 --
 View this message in context: PySpark RDD.partitionBy() requires an
 RDD of 
 tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at 
 Nabble.com.









Re: Is there a way to get the current progress of the job?

2014-04-02 Thread Philip Ogren
What I'd like is a way to capture the information provided on the stages 
page (i.e. cluster:4040/stages via IndexPage).  Looking through the 
Spark code, it doesn't seem like it is possible to directly query for 
specific facts such as how many tasks have succeeded or how many total 
tasks there are for a given active stage.  Instead, it looks like all 
the data for the page is generated at once using information from the 
JobProgressListener. It doesn't seem like I have any way to 
programmatically access this information myself.  I can't even 
instantiate my own JobProgressListener because it is spark package 
private.  I could implement my SparkListener and gather up the 
information myself.  It feels a bit awkward since classes like Task and 
TaskInfo are also spark package private.  It does seem possible to 
gather up what I need but it seems like this sort of information should 
just be available without by implementing a custom SparkListener (or 
worse screen scraping the html generated by StageTable!)


I was hoping that I would find the answer in MetricsServlet which is 
turned on by default.  It seems that when I visit 
http://cluster:4040/metrics/json/ I should be able to get everything I 
want but I don't see the basic stage/task progress information I would 
expect.  Are there special metrics properties that I should set to get 
this info?  I think this would be the best solution - just give it the 
right URL and parse the resulting JSON - but I can't seem to figure out 
how to do this or if it is possible.


Any advice is appreciated.

Thanks,
Philip


On 04/01/2014 09:43 AM, Philip Ogren wrote:

Hi DB,

Just wondering if you ever got an answer to your question about 
monitoring progress - either offline or through your own 
investigation.  Any findings would be appreciated.


Thanks,
Philip

On 01/30/2014 10:32 PM, DB Tsai wrote:

Hi guys,

When we're running a very long job, we would like to show users the 
current progress of map and reduce job. After looking at the api 
document, I don't find anything for this. However, in Spark UI, I 
could see the progress of the task. Is there anything I miss?


Thanks.

Sincerely,

DB Tsai
Machine Learning Engineer
Alpine Data Labs
--
Web: http://alpinenow.com/






Measure the Total Network I/O, Cpu and Memory Consumed by Spark Job

2014-04-02 Thread yxzhao
Hi All,

  I am intrested in measure the total network I/O, cpu and memory
consumed by Spark job. I tried to find the related information in logs and
Web UI. But there seems no sufficient information. Could anyone give me any
suggestion? 
  Thanks very much in advance.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Measure-the-Total-Network-I-O-Cpu-and-Memory-Consumed-by-Spark-Job-tp3668.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Efficient way to aggregate event data at daily/weekly/monthly level

2014-04-02 Thread K Koh
Hi, 

I want to aggregate (time-stamped) event data at daily, weekly and monthly
level stored in a directory in  data//mm/dd/dat.gz format. For example:


Each dat.gz file contains tuples in (datetime, id, value) format. I can
perform aggregation as follows:



but this code doesn't seem to be efficient because it doesn't exploit the
data dependencies in reduce steps. For example, there is no dependency in
between the data of 2010-01 and that of all other dates (2010-02, 2010-03,
...) so it would be ideal if we can load one month of data in each node once
and perform all three (daily, weekly and monthly) aggregation.

I think I could use mapPartitions and a big reducer that performs all three
aggregations, but not sure it is a right way to go.

Is there a more efficient way to perform these aggregations (by loading data
once) yet keeping the code modular?

Thanks,
K






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-aggregate-event-data-at-daily-weekly-monthly-level-tp3673.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Resilient nature of RDD

2014-04-02 Thread Patrick Wendell
The driver stores the meta-data associated with the partition, but the
re-computation will occur on an executor. So if several partitions are
lost, e.g. due to a few machines failing, the re-computation can be striped
across the cluster making it fast.


On Wed, Apr 2, 2014 at 11:27 AM, David Thomas dt5434...@gmail.com wrote:

 Can someone explain how RDD is resilient? If one of the partition is lost,
 who is responsible to recreate that partition - is it the driver program?



Re: Is there a way to get the current progress of the job?

2014-04-02 Thread Patrick Wendell
Hey Phillip,

Right now there is no mechanism for this. You have to go in through the low
level listener interface.

We could consider exposing the JobProgressListener directly - I think it's
been factored nicely so it's fairly decoupled from the UI. The concern is
this is a semi-internal piece of functionality and something we might, e.g.
want to change the API of over time.

- Patrick


On Wed, Apr 2, 2014 at 3:39 PM, Philip Ogren philip.og...@oracle.comwrote:

 What I'd like is a way to capture the information provided on the stages
 page (i.e. cluster:4040/stages via IndexPage).  Looking through the Spark
 code, it doesn't seem like it is possible to directly query for specific
 facts such as how many tasks have succeeded or how many total tasks there
 are for a given active stage.  Instead, it looks like all the data for the
 page is generated at once using information from the JobProgressListener.
 It doesn't seem like I have any way to programmatically access this
 information myself.  I can't even instantiate my own JobProgressListener
 because it is spark package private.  I could implement my SparkListener
 and gather up the information myself.  It feels a bit awkward since classes
 like Task and TaskInfo are also spark package private.  It does seem
 possible to gather up what I need but it seems like this sort of
 information should just be available without by implementing a custom
 SparkListener (or worse screen scraping the html generated by StageTable!)

 I was hoping that I would find the answer in MetricsServlet which is
 turned on by default.  It seems that when I visit
 http://cluster:4040/metrics/json/ I should be able to get everything I
 want but I don't see the basic stage/task progress information I would
 expect.  Are there special metrics properties that I should set to get this
 info?  I think this would be the best solution - just give it the right URL
 and parse the resulting JSON - but I can't seem to figure out how to do
 this or if it is possible.

 Any advice is appreciated.

 Thanks,
 Philip



 On 04/01/2014 09:43 AM, Philip Ogren wrote:

 Hi DB,

 Just wondering if you ever got an answer to your question about
 monitoring progress - either offline or through your own investigation.
  Any findings would be appreciated.

 Thanks,
 Philip

 On 01/30/2014 10:32 PM, DB Tsai wrote:

 Hi guys,

 When we're running a very long job, we would like to show users the
 current progress of map and reduce job. After looking at the api document,
 I don't find anything for this. However, in Spark UI, I could see the
 progress of the task. Is there anything I miss?

 Thanks.

 Sincerely,

 DB Tsai
 Machine Learning Engineer
 Alpine Data Labs
 --
 Web: http://alpinenow.com/






Re: Is there a way to get the current progress of the job?

2014-04-02 Thread Andrew Or
Hi Philip,

In the upcoming release of Spark 1.0 there will be a feature that provides
for exactly what you describe: capturing the information displayed on the
UI in JSON. More details will be provided in the documentation, but for
now, anything before 0.9.1 can only go through JobLogger.scala, which
outputs information in a somewhat arbitrary format and will be deprecated
soon. If you find this feature useful, you can test it out by building the
master branch of Spark yourself, following the instructions in
https://github.com/apache/spark/pull/42.

Andrew


On Wed, Apr 2, 2014 at 3:39 PM, Philip Ogren philip.og...@oracle.comwrote:

 What I'd like is a way to capture the information provided on the stages
 page (i.e. cluster:4040/stages via IndexPage).  Looking through the Spark
 code, it doesn't seem like it is possible to directly query for specific
 facts such as how many tasks have succeeded or how many total tasks there
 are for a given active stage.  Instead, it looks like all the data for the
 page is generated at once using information from the JobProgressListener.
 It doesn't seem like I have any way to programmatically access this
 information myself.  I can't even instantiate my own JobProgressListener
 because it is spark package private.  I could implement my SparkListener
 and gather up the information myself.  It feels a bit awkward since classes
 like Task and TaskInfo are also spark package private.  It does seem
 possible to gather up what I need but it seems like this sort of
 information should just be available without by implementing a custom
 SparkListener (or worse screen scraping the html generated by StageTable!)

 I was hoping that I would find the answer in MetricsServlet which is
 turned on by default.  It seems that when I visit
 http://cluster:4040/metrics/json/ I should be able to get everything I
 want but I don't see the basic stage/task progress information I would
 expect.  Are there special metrics properties that I should set to get this
 info?  I think this would be the best solution - just give it the right URL
 and parse the resulting JSON - but I can't seem to figure out how to do
 this or if it is possible.

 Any advice is appreciated.

 Thanks,
 Philip



 On 04/01/2014 09:43 AM, Philip Ogren wrote:

 Hi DB,

 Just wondering if you ever got an answer to your question about
 monitoring progress - either offline or through your own investigation.
  Any findings would be appreciated.

 Thanks,
 Philip

 On 01/30/2014 10:32 PM, DB Tsai wrote:

 Hi guys,

 When we're running a very long job, we would like to show users the
 current progress of map and reduce job. After looking at the api document,
 I don't find anything for this. However, in Spark UI, I could see the
 progress of the task. Is there anything I miss?

 Thanks.

 Sincerely,

 DB Tsai
 Machine Learning Engineer
 Alpine Data Labs
 --
 Web: http://alpinenow.com/






Re: Efficient way to aggregate event data at daily/weekly/monthly level

2014-04-02 Thread Nicholas Chammas
Watch out with loading data from gzipped files. Spark cannot parallelize
the load of gzipped files, and if you do not explicitly repartition your
RDD created from such a file, everything you do on that RDD will run on a
single core.


On Wed, Apr 2, 2014 at 8:22 PM, K Koh den...@gmail.com wrote:

 Hi,

 I want to aggregate (time-stamped) event data at daily, weekly and monthly
 level stored in a directory in  data//mm/dd/dat.gz format. For example:


 Each dat.gz file contains tuples in (datetime, id, value) format. I can
 perform aggregation as follows:



 but this code doesn't seem to be efficient because it doesn't exploit the
 data dependencies in reduce steps. For example, there is no dependency in
 between the data of 2010-01 and that of all other dates (2010-02, 2010-03,
 ...) so it would be ideal if we can load one month of data in each node
 once
 and perform all three (daily, weekly and monthly) aggregation.

 I think I could use mapPartitions and a big reducer that performs all three
 aggregations, but not sure it is a right way to go.

 Is there a more efficient way to perform these aggregations (by loading
 data
 once) yet keeping the code modular?

 Thanks,
 K






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-aggregate-event-data-at-daily-weekly-monthly-level-tp3673.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Status of MLI?

2014-04-02 Thread Evan R. Sparks
Targeting 0.9.0 should work out of the box (just a change to the build.sbt)
- I'll push some changes I've been sitting on to the public repo in the
next couple of days.


On Wed, Apr 2, 2014 at 4:05 AM, Krakna H shankark+...@gmail.com wrote:

 Thanks for the update Evan! In terms of using MLI, I see that the Github
 code is linked to Spark 0.8; will it not work with 0.9 (which is what I
 have set up) or higher versions?


 On Wed, Apr 2, 2014 at 1:44 AM, Evan R. Sparks [via Apache Spark User
 List] [hidden email] 
 http://user/SendEmail.jtp?type=nodenode=3632i=0wrote:

 Hi there,

 MLlib is the first component of MLbase - MLI and the higher levels of the
 stack are still being developed. Look for updates in terms of our progress
 on the hyperparameter tuning/model selection problem in the next month or
 so!

 - Evan


 On Tue, Apr 1, 2014 at 8:05 PM, Krakna H [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=3615i=0
  wrote:

 Hi Nan,

 I was actually referring to MLI/MLBase (http://www.mlbase.org); is this
 being actively developed?

 I'm familiar with mllib and have been looking at its documentation.

 Thanks!


 On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] 
 [hidden
 email] http://user/SendEmail.jtp?type=nodenode=3612i=0 wrote:

  mllib has been part of Spark distribution (under mllib directory),
 also check http://spark.apache.org/docs/latest/mllib-guide.html

 and for JIRA, because of the recent migration to apache JIRA, I think
 all mllib-related issues should be under the Spark umbrella,
 https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel

 --
 Nan Zhu

 On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote:

 What is the current development status of MLI/MLBase? I see that the
 github repo is lying dormant (https://github.com/amplab/MLI) and JIRA
 has had no activity in the last 30 days (
 https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
 Is the plan to add a lot of this into mllib itself without needing a
 separate API?

 Thanks!

 --
 View this message in context: Status of 
 MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at
 Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html
  To start a new topic under Apache Spark User List, email [hidden
 email] http://user/SendEmail.jtp?type=nodenode=3612i=1
 To unsubscribe from Apache Spark User List, click here.
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Status of 
 MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html

 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3615.html
  To start a new topic under Apache Spark User List, email [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=3632i=1
 To unsubscribe from Apache Spark User List, click here.
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Status of 
 MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3632.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.



Re: Optimal Server Design for Spark

2014-04-02 Thread Mayur Rustagi
I would suggest to start with cloud hosting if you can, depending on your
usecase, memory requirement may vary a lot .
Regards
Mayur
On Apr 2, 2014 3:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 Hey Steve,

 This configuration sounds pretty good. The one thing I would consider is
 having more disks, for two reasons — Spark uses the disks for large
 shuffles and out-of-core operations, and often it’s better to run HDFS or
 your storage system on the same nodes. But whether this is valuable will
 depend on whether you plan to do that in your deployment. You should
 determine that and go from there.

 The amount of cores and RAM are both good — actually with a lot more of
 these you would probably want to run multiple Spark workers per node, which
 is more work to configure. Your numbers are in line with other deployments.

 There’s a provisioning overview with more details at
 https://spark.apache.org/docs/latest/hardware-provisioning.html but what
 you have sounds fine.

 Matei

 On Apr 2, 2014, at 2:58 PM, Stephen Watt sw...@redhat.com wrote:

  Hi Folks
 
  I'm looking to buy some gear to run Spark. I'm quite well versed in
 Hadoop Server design but there does not seem to be much Spark related
 collateral around infrastructure guidelines (or at least I haven't been
 able to find them). My current thinking for server design is something
 along these lines.
 
  - 2 x 10Gbe NICs
  - 128 GB RAM
  - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and
 Runtimes, 4 x 1TB for Data Drives)
  - 1 Disk Controller
  - 2 x 2.6 GHz 6 core processors
 
  If I stick with 1u servers then I lose disk capacity per rack but I get
 a lot more memory and CPU capacity per rack. This increases my total
 cluster memory footprint and it doesn't seem to make sense to have super
 dense storage servers because I can't fit all that data on disk in memory
 anyways. So at present, my thinking is to go with 1u servers instead of 2u
 Servers. Is 128GB RAM per server normal? Do you guys use more or less than
 that?
 
  Any feedback would be appreciated
 
  Regards
  Steve Watt




Re: Optimal Server Design for Spark

2014-04-02 Thread Debasish Das
Hi Matei,

How can I run multiple Spark workers per node ? I am running 8 core 10 node
cluster but I do have 8 more cores on each nodeSo having 2 workers per
node will definitely help my usecase.

Thanks.
Deb




On Wed, Apr 2, 2014 at 3:58 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hey Steve,

 This configuration sounds pretty good. The one thing I would consider is
 having more disks, for two reasons -- Spark uses the disks for large
 shuffles and out-of-core operations, and often it's better to run HDFS or
 your storage system on the same nodes. But whether this is valuable will
 depend on whether you plan to do that in your deployment. You should
 determine that and go from there.

 The amount of cores and RAM are both good -- actually with a lot more of
 these you would probably want to run multiple Spark workers per node, which
 is more work to configure. Your numbers are in line with other deployments.

 There's a provisioning overview with more details at
 https://spark.apache.org/docs/latest/hardware-provisioning.html but what
 you have sounds fine.

 Matei

 On Apr 2, 2014, at 2:58 PM, Stephen Watt sw...@redhat.com wrote:

  Hi Folks
 
  I'm looking to buy some gear to run Spark. I'm quite well versed in
 Hadoop Server design but there does not seem to be much Spark related
 collateral around infrastructure guidelines (or at least I haven't been
 able to find them). My current thinking for server design is something
 along these lines.
 
  - 2 x 10Gbe NICs
  - 128 GB RAM
  - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and
 Runtimes, 4 x 1TB for Data Drives)
  - 1 Disk Controller
  - 2 x 2.6 GHz 6 core processors
 
  If I stick with 1u servers then I lose disk capacity per rack but I get
 a lot more memory and CPU capacity per rack. This increases my total
 cluster memory footprint and it doesn't seem to make sense to have super
 dense storage servers because I can't fit all that data on disk in memory
 anyways. So at present, my thinking is to go with 1u servers instead of 2u
 Servers. Is 128GB RAM per server normal? Do you guys use more or less than
 that?
 
  Any feedback would be appreciated
 
  Regards
  Steve Watt




How to ask questions on Spark usage?

2014-04-02 Thread weida xu
Hi,

Shall I send my questions to this Email address?

Sorry for bothering, and thanks a lot!


Re: How to ask questions on Spark usage?

2014-04-02 Thread Andrew Or
Yes, please do. :)


On Wed, Apr 2, 2014 at 7:36 PM, weida xu xwd0...@gmail.com wrote:

 Hi,

 Shall I send my questions to this Email address?

 Sorry for bothering, and thanks a lot!



Spark RDD to Shark table IN MEMORY conversion

2014-04-02 Thread abhietc31
Hi,
We are  placing business logic in incoming data stream using Spark
streaming. Here I want to point Shark table to use data coming from Spark
Streaming. Instead of storing Spark streaming to HDFS or other area, is
there a way I can directly point Shark in-memory table to take data from
Spark Streaming ?

Spark version: 0.81
Shark version: 0.81


Thanks,
Abhishek








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-to-Shark-table-IN-MEMORY-conversion-tp3682.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Shark Direct insert into table value (?)

2014-04-02 Thread abhietc31
Hi,
I'm trying to run script  in SHARK(0.81)   insert into emp  (id,name)
values (212,Abhi)  but it doesn't work.
I urgently need direct insert as it is show stopper.

I know that we can do  insert into emp select * from xyz.
Here requirement is direct insert.

Does any one tried it ? Or is there SHARK API that allow us to do it ?

Please assist.

Thanks,
Abhishek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shark-Direct-insert-into-table-value-tp3684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Submitting to yarn cluster

2014-04-02 Thread Ron Gonzalez
Hi,
  I have a small program but I cannot seem to make it connect to the right 
properties of the cluster.
  I have the SPARK_YARN_APP_JAR, SPARK_JAR and SPARK_HOME set properly.
  If I run this scala file, I am seeing that this is never using the 
yarn.resourcemanager.address property that I set on the SparkConf instance.
  Any advice?

Thanks,
Ron

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.deploy.yarn.Client
import java.lang.System
import org.apache.spark.SparkConf


object SimpleApp {
  def main(args: Array[String]) {
    val logFile = 
/home/rgonzalez/app/spark-0.9.0-incubating-bin-hadoop2/README.md
    val conf = new SparkConf()
    conf.set(yarn.resourcemanager.address, localhost:8050)
    val sc = new SparkContext(yarn-client, Simple App, conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line = line.contains(a)).count()
    val numBs = logData.filter(line = line.contains(b)).count()
    
    println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))
  }
}

Error when run Spark on mesos

2014-04-02 Thread felix
I deployed mesos and test it using the exmaple/test-framework script, mesos
seems OK.but when runing spark on the mesos cluster, the mesos slave nodes
report the following exception, any one can help me to fix this ? thanks in
advance:14/04/03 11:24:39 INFO Slf4jLogger: Slf4jLogger started14/04/03
11:24:39 INFO Remoting: Starting remoting14/04/03 11:24:39 INFO Remoting:
Remoting started; listening on addresses
:[akka.tcp://spark@pdn00:40265]14/04/03 11:24:39 INFO Remoting: Remoting now
listens on addresses: [akka.tcp://spark@pdn00:40265]14/04/03 11:24:39 INFO
SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@localhost:7077/user/BlockManagerMasterakka.actor.ActorNotFound:
Actor not found for:
ActorSelection[Actor[akka.tcp://spark@localhost:7077/]/user/BlockManagerMaster] 
  
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)  
 
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)  
 
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
   
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
   
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
   
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
   
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)   
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)   
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
   
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) 
  
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
   
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
   
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
  
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)at
akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)at
akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)at
akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
   
at akka.actor.ActorRef.tell(ActorRef.scala:125)at
akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44)   
at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)   
at
akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
   
at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)at
akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)   
at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)   
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
   
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
   
at akka.actor.ActorCell.terminate(ActorCell.scala:338)at
akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)at
akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)at
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)at
akka.dispatch.Mailbox.run(Mailbox.scala:218)at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)   
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
  
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-run-Spark-on-mesos-tp3687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Error when run Spark on mesos

2014-04-02 Thread panfei
any advice ?


2014-04-03 11:35 GMT+08:00 felix cnwe...@gmail.com:

 I deployed mesos and test it using the exmaple/test-framework script,
 mesos seems OK. but when runing spark on the mesos cluster, the mesos slave
 nodes report the following exception, any one can help me to fix this ?
 thanks in advance: 14/04/03 11:24:39 INFO Slf4jLogger: Slf4jLogger started
 14/04/03 11:24:39 INFO Remoting: Starting remoting 14/04/03 11:24:39 INFO
 Remoting: Remoting started; listening on addresses 
 :[akka.tcp://spark@pdn00:40265]
 14/04/03 11:24:39 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@pdn00:40265] 14/04/03 11:24:39 INFO SparkEnv:
 Connecting to BlockManagerMaster: 
 akka.tcp://spark@localhost:7077/user/BlockManagerMaster
 akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Actor[akka.tcp://spark@localhost:7077/]/user/BlockManagerMaster]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at
 akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at
 akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at
 akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
 at akka.actor.ActorRef.tell(ActorRef.scala:125) at
 akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44) at
 akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at
 akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
 at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at
 akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at
 akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at
 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
 at
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
 at akka.actor.ActorCell.terminate(ActorCell.scala:338) at
 akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at
 akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at
 akka.dispatch.Mailbox.run(Mailbox.scala:218) at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 --
 View this message in context: Error when run Spark on 
 mesoshttp://apache-spark-user-list.1001560.n3.nabble.com/Error-when-run-Spark-on-mesos-tp3687.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.




-- 
不学习,不知道


Re: Error when run Spark on mesos

2014-04-02 Thread Ian Ferreira
I think this is related to a known issue (regression) in 0.9.0.  Try using 
explicit IP other than loop back.

Sent from a mobile device

 On Apr 2, 2014, at 8:53 PM, panfei cnwe...@gmail.com wrote:
 
 any advice ?
 
 
 2014-04-03 11:35 GMT+08:00 felix cnwe...@gmail.com:
 I deployed mesos and test it using the exmaple/test-framework script, mesos 
 seems OK. but when runing spark on the mesos cluster, the mesos slave nodes 
 report the following exception, any one can help me to fix this ? thanks in 
 advance: 14/04/03 11:24:39 INFO Slf4jLogger: Slf4jLogger started 14/04/03 
 11:24:39 INFO Remoting: Starting remoting 14/04/03 11:24:39 INFO Remoting: 
 Remoting started; listening on addresses :[akka.tcp://spark@pdn00:40265] 
 14/04/03 11:24:39 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://spark@pdn00:40265] 14/04/03 11:24:39 INFO SparkEnv: Connecting 
 to BlockManagerMaster: 
 akka.tcp://spark@localhost:7077/user/BlockManagerMaster 
 akka.actor.ActorNotFound: Actor not found for: 
 ActorSelection[Actor[akka.tcp://spark@localhost:7077/]/user/BlockManagerMaster]
  at 
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
  at 
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
  at 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
  at 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
  at 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
  at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at 
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
  at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) 
 at 
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
  at 
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at 
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at 
 akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at 
 akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at 
 akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at 
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
  at akka.actor.ActorRef.tell(ActorRef.scala:125) at 
 akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44) at 
 akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at 
 akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
  at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at 
 akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at 
 akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at 
 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
  at 
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at 
 akka.actor.ActorCell.terminate(ActorCell.scala:338) at 
 akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at 
 akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at 
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at 
 akka.dispatch.Mailbox.run(Mailbox.scala:218) at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
 at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  
 View this message in context: Error when run Spark on mesos
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 
 
 -- 
 不学习,不知道


Example of creating expressions for SchemaRDD methods

2014-04-02 Thread All In A Days Work
For various schemaRDD functions like select, where, orderby, groupby etc. I
would like to create expression objects and pass these to the methods for
execution.

Can someone show some examples of how to create expressions for case class
and execute ? E.g., how to create expressions for select, order by, group
by etc. and execute methods using the expressions ?

Regards,