Re: How to set UI port #?

2015-01-12 Thread Prannoy
Set the port using

spconf.set(spark.ui.port,);

where,  is any port

spconf is your spark configuration object.

On Sun, Jan 11, 2015 at 2:08 PM, YaoPau [via Apache Spark User List] 
ml-node+s1001560n21083...@n3.nabble.com wrote:

 I have multiple Spark Streaming jobs running all day, and so when I run my
 hourly batch job, I always get a java.net.BindException: Address already
 in use which starts at 4040 then goes to 4041, 4042, 4043 before settling
 at 4044.

 That slows down my hourly job, and time is critical.  Is there a way I can
 set it to 4044 by default, or prevent the UI from launching altogether?

 Jon

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083p21090.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Problem with building spark-1.2.0

2015-01-12 Thread Kartheek.R
Hi,
This is what I am trying to do:

karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean
Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from
/home/karthik/spark-1.2.0/project/project
Cloning into
'/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'...
fatal: unable to access 'https://github.com/ScrapCodes/sbt-pom-reader.git/':
Received HTTP code 407 from proxy after CONNECT
java.lang.RuntimeException: Nonzero exit code (128): git clone
https://github.com/ScrapCodes/sbt-pom-reader.git
/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader
at scala.sys.package$.error(package.scala:27)
at sbt.Resolvers$.run(Resolvers.scala:127)
at sbt.Resolvers$.run(Resolvers.scala:117)
at sbt.Resolvers$$anon$2.clone(Resolvers.scala:74)
at
sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11$$anonfun$apply$5.apply$mcV$sp(Resolvers.scala:99)
at sbt.Resolvers$.creates(Resolvers.scala:134)
at
sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11.apply(Resolvers.scala:98)
at
sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11.apply(Resolvers.scala:97)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$3.apply(BuildLoader.scala:88)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$3.apply(BuildLoader.scala:87)
at scala.Option.map(Option.scala:145)
at 
sbt.BuildLoader$$anonfun$componentLoader$1.apply(BuildLoader.scala:87)
at 
sbt.BuildLoader$$anonfun$componentLoader$1.apply(BuildLoader.scala:83)
at sbt.MultiHandler.apply(BuildLoader.scala:15)
at sbt.BuildLoader.apply(BuildLoader.scala:139)
at sbt.Load$.loadAll(Load.scala:334)
at sbt.Load$.loadURI(Load.scala:289)
at sbt.Load$.load(Load.scala:285)
at sbt.Load$.load(Load.scala:276)
at sbt.Load$.apply(Load.scala:130)
at sbt.Load$.buildPluginDefinition(Load.scala:819)
at sbt.Load$.buildPlugins(Load.scala:785)
at sbt.Load$.plugins(Load.scala:773)
at sbt.Load$.loadUnit(Load.scala:431)
at sbt.Load$$anonfun$18$$anonfun$apply$11.apply(Load.scala:281)
at sbt.Load$$anonfun$18$$anonfun$apply$11.apply(Load.scala:281)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader.scala:91)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader.scala:90)
at sbt.BuildLoader.apply(BuildLoader.scala:140)
at sbt.Load$.loadAll(Load.scala:334)
at sbt.Load$.loadURI(Load.scala:289)
at sbt.Load$.load(Load.scala:285)
at sbt.Load$.load(Load.scala:276)
at sbt.Load$.apply(Load.scala:130)
at sbt.Load$.defaultLoad(Load.scala:36)
at sbt.BuiltinCommands$.doLoadProject(Main.scala:481)
at sbt.BuiltinCommands$$anonfun$loadProjectImpl$2.apply(Main.scala:475)
at sbt.BuiltinCommands$$anonfun$loadProjectImpl$2.apply(Main.scala:475)
at
sbt.Command$$anonfun$applyEffect$1$$anonfun$apply$2.apply(Command.scala:58)
at
sbt.Command$$anonfun$applyEffect$1$$anonfun$apply$2.apply(Command.scala:58)
at
sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:60)
at
sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:60)
at sbt.Command$.process(Command.scala:92)
at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:98)
at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:98)
at sbt.State$$anon$1.process(State.scala:184)
at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:98)
at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:98)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.MainLoop$.next(MainLoop.scala:98)
at sbt.MainLoop$.run(MainLoop.scala:91)
at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:70)
at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:65)
at sbt.Using.apply(Using.scala:24)
at sbt.MainLoop$.runWithNewLog(MainLoop.scala:65)
at sbt.MainLoop$.runAndClearLast(MainLoop.scala:48)
at sbt.MainLoop$.runLoggedLoop(MainLoop.scala:32)
at sbt.MainLoop$.runLogged(MainLoop.scala:24)
at sbt.StandardMain$.runManaged(Main.scala:53)
at sbt.xMain.run(Main.scala:28)
at xsbt.boot.Launch$$anonfun$run$1.apply(Launch.scala:109)
at xsbt.boot.Launch$.withContextLoader(Launch.scala:128)
at xsbt.boot.Launch$.run(Launch.scala:109)
at xsbt.boot.Launch$$anonfun$apply$1.apply(Launch.scala:35)
at xsbt.boot.Launch$.launch(Launch.scala:117)
at xsbt.boot.Launch$.apply(Launch.scala:18)
at 

GraphX vs GraphLab

2015-01-12 Thread Madabhattula Rajesh Kumar
Hi Team,

Is any one done comparison(pros and cons ) study between GraphX ad GraphLab.

Could you please let me know any links for this comparison.

Regards,
Rajesh


Re: Failed to save RDD as text file to local file system

2015-01-12 Thread Prannoy
Have you tried simple giving the path where you want to save the file ?

For instance in your case just do

*r.saveAsTextFile(home/cloudera/tmp/out1) *

Dont use* file*

This will create a folder with name out1. saveAsTextFile always write by
making a directory, it does not write data into a single file.

Incase you need a single file you can use copyMerge API in FileUtils.

*FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 ,
true, conf,null);*

Now out2 will be a single file containing your data.

*fs* is the configuration of you local file system.

Thanks



On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] 
ml-node+s1001560n21068...@n3.nabble.com wrote:

  No, do you have any idea?



 Regards,



 *Ningjun Wang*

 Consulting Software Engineer

 LexisNexis

 121 Chanlon Road

 New Providence, NJ 07974-1541



 *From:* firemonk9 [via Apache Spark User List] [mailto:ml-node+[hidden
 email] http:///user/SendEmail.jtp?type=nodenode=21068i=0]
 *Sent:* Friday, January 09, 2015 2:56 PM
 *To:* Wang, Ningjun (LNG-NPV)
 *Subject:* Re: Failed to save RDD as text file to local file system



 Have you found any resolution for this issue ?
  --

 *If you reply to this email, your message will be added to the discussion
 below:*


 http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html

 To unsubscribe from Failed to save RDD as text file to local file system, 
 click
 here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21068.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21093.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

how to select the first row in each group by group?

2015-01-12 Thread LinQili
Hi all:I am using spark sql to read and write hive tables. But There is a issue 
that how to select the first row in each group by group?In hive, we could write 
hql like this:SELECT imeiFROM (SELECT imei,
row_number() over (PARTITION BY imei ORDER BY login_time ASC) AS row_num
FROM login_log_2015010914) a  WHERE row_num = 1

In spark sql, how to write the sql equal to the hql?
  

Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Hi all
Is there efficient way to trigger RDD transformations? I'm now using count
action to achieve this.

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Hafiz Mujadid
Actually this code is producing error leader not found exception. I am
unable to find the reason

On Mon, Jan 12, 2015 at 4:03 PM, kevinkim [via Apache Spark User List] 
ml-node+s1001560n21098...@n3.nabble.com wrote:

 Well, you can use coalesce() to decrease number of partition to 1.
 (It will take time and quite not efficient, tough)

 Regards,
 Kevin.

 On Mon Jan 12 2015 at 7:57:39 PM Hafiz Mujadid [via Apache Spark User
 List] [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21098i=0 wrote:

 Hi experts!


 I have a schemaRDD of messages to be pushed in kafka. So I am using
 following piece of code to do that

 rdd.foreachPartition(itr = {
 val props = new Properties()
 props.put(metadata.broker.list,
 brokersList)
 props.put(serializer.class,
 kafka.serializer.StringEncoder)
 props.put(compression.codec,
 codec.toString)
 props.put(producer.type, sync)
 props.put(batch.num.messages,
 BatchSize.toString)
 props.put(message.send.max.retries,
 maxRetries.toString)
 props.put(request.required.acks, -1)
 producer = new Producer[String,
 String](new ProducerConfig(props))
 itr.foreach(row = {
 val msg =
 row.toString.drop(1).dropRight(1)
 this.synchronized {
 producer.send(new
 KeyedMessage[String, String](Topic, msg))
 }
 })
 producer.close
 })



 the problem with this code is that it creates kafka producer separate for
 each partition and I want a single producer for all partitions. Is there
 any way to achieve this?


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
  To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21098.html
  To unsubscribe from creating a single kafka producer object for all
 partitions, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21097code=aGFmaXptdWphZGlkMDBAZ21haWwuY29tfDIxMDk3fC05MjEzOTMxMTE=
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




-- 
Regards: HAFIZ MUJADID




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Does DecisionTree model in MLlib deal with missing values?

2015-01-12 Thread Sean Owen
On Sun, Jan 11, 2015 at 9:46 PM, Christopher Thom
christopher.t...@quantium.com.au wrote:
 Is there any plan to extend the data types that would be accepted by the Tree 
 models in Spark? e.g. Many models that we build contain a large number of 
 string-based categorical factors. Currently the only strategy is to map these 
 string values to integers, and store the mapping so the data can be remapped 
 when the model is scored. A viable solution, but cumbersome for models with 
 hundreds of these kinds of factors.

I think there is nothing on the roadmap, except that in the newer ML
API (the bits under spark.ml), there's fuller support for the idea of
a pipeline of transformations, of which performing this encoding could
be one step.

Since it's directly relevant, I don't mind mentioning that we did
build this sort of logic on top of MLlib and PMML. There's nothing
hard about it, just a pile of translation and counting code, such as
in 
https://github.com/OryxProject/oryx/blob/master/oryx-app-common/src/main/java/com/cloudera/oryx/app/rdf/RDFPMMLUtils.java

So there are bits you can reuse out there especially if your goal is
to get to PMML, which will want to represent all the actual
categorical values in its DataDictionary and not encodings.


 Concerning missing data, I haven't been able to figure out how to use NULL 
 values in LabeledPoints, and I'm not sure whether DecisionTrees correctly 
 handle the case of missing data. The only thing I've been able to work out is 
 to use a placeholder value,

Yes, I don't think that's supported. In model training, you can simply
ignore data that can't reach the node because it lacks a feature
needed in a decision rule. This is OK as long as not that much data is
missing.

In scoring you can't not-answer. Again if you refer to PMML, you can
see some ideas about how to handle this:
http://www.dmg.org/v4-2-1/TreeModel.html#xsdType_MISSING-VALUE-STRATEGY

- Make no prediction
- Just copy the last prediction
- Use a model-supplied default for the node
- Use some confidence weighted combination of the answer you'd get by
following both paths

I have opted, in the past, for simply defaulting to the subtree with
more training examples. All of these strategies are approximations,
yes.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RowMatrix multiplication

2015-01-12 Thread Alex Minnaar
I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?


Thanks,


Alex


Re: Manually trigger RDD map function without action

2015-01-12 Thread Cody Koeninger
If you don't care about the value that your map produced (because you're
not already collecting or saving it), then is foreach more appropriate to
what you're doing?

On Mon, Jan 12, 2015 at 4:08 AM, kevinkim kevin...@apache.org wrote:

 Hi, answer from another Kevin.

 I think you may already know it,
 'transformation' in spark
 (
 http://spark.apache.org/docs/latest/programming-guide.html#transformations
 )
 will be done in 'lazy' way, when you trigger 'actions'.
 (http://spark.apache.org/docs/latest/programming-guide.html#actions)

 So you can use
 'collect' - if you need result from memory
 'count' - if you need to count
 'saveAs ...' - if you need to persist transformed RDD

 Regards,
 Kevin


 On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] 
 [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=21095i=0 wrote:

 Hi all
 Is there efficient way to trigger RDD transformations? I'm now using
 count action to achieve this.

 Best regards
 Kevin

 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
  To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


 --
 View this message in context: Re: Manually trigger RDD map function
 without action
 http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Cody Koeninger
You should take a look at

https://issues.apache.org/jira/browse/SPARK-4122

which is implementing writing to kafka in a pretty similar way (make a new
producer inside foreachPartition)

On Mon, Jan 12, 2015 at 5:24 AM, Sean Owen so...@cloudera.com wrote:

 Leader-not-found suggests a problem with zookeeper config. It depends
 a lot on the specifics of your error. But this is really a Kafka
 question, better for the Kafka list.

 On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid
 hafizmujadi...@gmail.com wrote:
  Actually this code is producing error leader not found exception. I am
  unable to find the reason
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Removing JARs from spark-jobserver

2015-01-12 Thread Fernando O.
just an FYI: you can configure that using spark.jobserver.filedao.rootdir

On Mon, Jan 12, 2015 at 1:52 AM, abhishek reachabhishe...@gmail.com wrote:

 Nice!  Good to know
 On 11 Jan 2015 21:10, Sasi [via Apache Spark User List] [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21089i=0 wrote:

 Thank you Abhishek. That works.

 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Removing-JARs-from-spark-jobserver-tp21081p21084.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21089i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


 --
 View this message in context: Re: Removing JARs from spark-jobserver
 http://apache-spark-user-list.1001560.n3.nabble.com/Removing-JARs-from-spark-jobserver-tp21081p21089.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



creating a single kafka producer object for all partitions

2015-01-12 Thread Hafiz Mujadid
Hi experts!


I have a schemaRDD of messages to be pushed in kafka. So I am using
following piece of code to do that

rdd.foreachPartition(itr = {
val props = new Properties()
props.put(metadata.broker.list, brokersList)
props.put(serializer.class, 
kafka.serializer.StringEncoder)
props.put(compression.codec, codec.toString)
props.put(producer.type, sync)
props.put(batch.num.messages, 
BatchSize.toString)
props.put(message.send.max.retries, 
maxRetries.toString)
props.put(request.required.acks, -1)
producer = new Producer[String, String](new 
ProducerConfig(props))
itr.foreach(row = {
val msg = 
row.toString.drop(1).dropRight(1)
this.synchronized {
producer.send(new 
KeyedMessage[String, String](Topic, msg))
}
})
producer.close
})



the problem with this code is that it creates kafka producer separate for
each partition and I want a single producer for all partitions. Is there any
way to achieve this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Play Scala Spark Exmaple

2015-01-12 Thread Eduardo Cusa
The EC2 versión is 1.1.0 and this is my build.sbt:


libraryDependencies ++= Seq(
  jdbc,
  anorm,
  cache,
  org.apache.spark  %% spark-core  % 1.1.0,
  com.typesafe.akka %% akka-actor  % 2.2.3,
  com.typesafe.akka %% akka-slf4j  % 2.2.3,
  org.apache.spark  %% spark-streaming-twitter % 1.1.0,
  org.apache.spark  %% spark-sql   % 1.1.0,
  org.apache.spark  %% spark-mllib % 1.1.0
  )



On Sun, Jan 11, 2015 at 3:01 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 What is your spark version that is running on the EC2 cluster? From the build
 file https://github.com/knoldus/Play-Spark-Scala/blob/master/build.sbt
 of your play application it seems that it uses Spark 1.0.1.

 Thanks
 Best Regards

 On Fri, Jan 9, 2015 at 7:17 PM, Eduardo Cusa 
 eduardo.c...@usmediaconsulting.com wrote:

 Hi guys, I running the following example :
 https://github.com/knoldus/Play-Spark-Scala in the same machine as the
 spark master, and the spark cluster was lauched with ec2 script.


 I'm stuck with this errors, any idea how to fix it?

 Regards
 Eduardo


 call the play app prints the following exception :


 [*error*] a.r.EndpointWriter - AssociationError 
 [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] - 
 [akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575]: Error 
 [Shut down address: 
 akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575] [
 akka.remote.ShutDownAssociation: Shut down address: 
 akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575
 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The 
 remote system terminated the association because it is shutting down.




 The master recive the spark application and generate the following stderr
 log page:


 15/01/09 13:31:23 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856]
 15/01/09 13:31:23 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856]
 15/01/09 13:31:23 INFO util.Utils: Successfully started service 
 'sparkExecutor' on port 37856.
 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to MapOutputTracker: 
 akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/MapOutputTracker
 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to BlockManagerMaster: 
 akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/BlockManagerMaster
 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at 
 /mnt/spark/spark-local-20150109133123-3805
 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at 
 /mnt2/spark/spark-local-20150109133123-b05e
 15/01/09 13:31:23 INFO util.Utils: Successfully started service 'Connection 
 manager for block manager' on port 36936.
 15/01/09 13:31:23 INFO network.ConnectionManager: Bound socket to port 36936 
 with id = ConnectionManagerId(ip-10-158-18-250.ec2.internal,36936)
 15/01/09 13:31:23 INFO storage.MemoryStore: MemoryStore started with 
 capacity 265.4 MB
 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Trying to register 
 BlockManager
 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Registered BlockManager
 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: 
 akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/HeartbeatReceiver
 15/01/09 13:31:54 ERROR executor.CoarseGrainedExecutorBackend: Driver 
 Disassociated [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:57671] 
 - [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] 
 disassociated! Shutting down.





Re: calculating the mean of SparseVector RDD

2015-01-12 Thread Rok Roskar
This was without using Kryo -- if I use kryo, I got errors about buffer
overflows (see above):

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

Just calling colStats doesn't actually compute those statistics, does it?
It looks like the computation is only carried out once you call the .mean()
method.



On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng men...@gmail.com wrote:

 colStats() computes the mean values along with several other summary
 statistics, which makes it slower. How is the performance if you don't
 use kryo? -Xiangrui

 On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote:
  thanks for the suggestion -- however, looks like this is even slower.
 With
  the small data set I'm using, my aggregate function takes ~ 9 seconds and
  the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
  the Kyro serializer -- I get the error:
 
  com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
  required: 8
 
  is there an easy/obvious fix?
 
 
  On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote:
 
  There is some serialization overhead. You can try
 
 
 https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
  . -Xiangrui
 
  On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
   I have an RDD of SparseVectors and I'd like to calculate the means
   returning
   a dense vector. I've tried doing this with the following (using
 pyspark,
   spark v1.2.0):
  
   def aggregate_partition_values(vec1, vec2) :
   vec1[vec2.indices] += vec2.values
   return vec1
  
   def aggregate_combined_vectors(vec1, vec2) :
   if all(vec1 == vec2) :
   # then the vector came from only one partition
   return vec1
   else:
   return vec1 + vec2
  
   means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
   aggregate_combined_vectors)
   means = means / nvals
  
   This turns out to be really slow -- and doesn't seem to depend on how
   many
   vectors there are so there seems to be some overhead somewhere that
 I'm
   not
   understanding. Is there a better way of doing this?
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: Problem with building spark-1.2.0

2015-01-12 Thread Sean Owen
The problem is there in the logs. When it went to clone some code,
something went wrong with the proxy:

Received HTTP code 407 from proxy after CONNECT

Probably you have an HTTP proxy and you have not authenticated. It's
specific to your environment.

Although it's unrelated, I'm curious how your build refers to
https://github.com/ScrapCodes/sbt-pom-reader.git  I don't see this
repo the project code base.

On Mon, Jan 12, 2015 at 9:09 AM, Kartheek.R kartheek.m...@gmail.com wrote:
 Hi,
 This is what I am trying to do:

 karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean
 Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
 Note, this will be overridden by -java-home if it is set.
 [info] Loading project definition from
 /home/karthik/spark-1.2.0/project/project
 Cloning into
 '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'...
 fatal: unable to access 'https://github.com/ScrapCodes/sbt-pom-reader.git/':
 Received HTTP code 407 from proxy after CONNECT
 java.lang.RuntimeException: Nonzero exit code (128): git clone
 https://github.com/ScrapCodes/sbt-pom-reader.git

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Parquet - data are reading very very slow

2015-01-12 Thread kaushal
yes , i am also facing same problem .. please any one help to get fast
execution.
thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Parquet-data-are-reading-very-very-slow-tp21061p21100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Sean Owen
Leader-not-found suggests a problem with zookeeper config. It depends
a lot on the specifics of your error. But this is really a Kafka
question, better for the Kafka list.

On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid
hafizmujadi...@gmail.com wrote:
 Actually this code is producing error leader not found exception. I am
 unable to find the reason


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Manually trigger RDD map function without action

2015-01-12 Thread kevinkim
Hi, answer from another Kevin.

I think you may already know it,
'transformation' in spark
(http://spark.apache.org/docs/latest/programming-guide.html#transformations)
will be done in 'lazy' way, when you trigger 'actions'.
(http://spark.apache.org/docs/latest/programming-guide.html#actions)

So you can use
'collect' - if you need result from memory
'count' - if you need to count
'saveAs ...' - if you need to persist transformed RDD

Regards,
Kevin


On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] 
ml-node+s1001560n21094...@n3.nabble.com wrote:

 Hi all
 Is there efficient way to trigger RDD transformations? I'm now using count
 action to achieve this.

 Best regards
 Kevin

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
  To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=a2V2aW5raW1AYXBhY2hlLm9yZ3wxfC0xNDUyMjU3MDUw
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Issue with Parquet on Spark 1.2 and Amazon EMR

2015-01-12 Thread Aniket Bhatnagar
Meanwhile, I have submitted a pull request (
https://github.com/awslabs/emr-bootstrap-actions/pull/37) that allows users
to place their jars ahead of all other jars in spark classpath. This should
serve as a temporary workaround for all class conflicts.

Thanks,
Aniket

On Mon Jan 05 2015 at 22:13:47 Kelly, Jonathan jonat...@amazon.com wrote:

   I've noticed the same thing recently and will contact the appropriate
 owner soon.  (I work for Amazon, so I'll go through internal channels and
 report back to this list.)

  In the meantime, I've found that editing spark-env.sh and putting the
 Spark assembly first in the classpath fixes the issue.  I expect that the
 version of Parquet that's being included in the EMR libs just needs to be
 upgraded.


  ~ Jonathan Kelly

   From: Aniket Bhatnagar aniket.bhatna...@gmail.com
 Date: Sunday, January 4, 2015 at 10:51 PM
 To: Adam Gilmore dragoncu...@gmail.com, user@spark.apache.org 
 user@spark.apache.org
 Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR

   Can you confirm your emr version? Could it be because of the classpath
 entries for emrfs? You might face issues with using S3 without them.

 Thanks,
 Aniket

 On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore dragoncu...@gmail.com wrote:

  Just an update on this - I found that the script by Amazon was the
 culprit - not exactly sure why.  When I installed Spark manually onto the
 EMR (and did the manual configuration of all the EMR stuff), it worked fine.

 On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore dragoncu...@gmail.com
 wrote:

  Hi all,

  I've just launched a new Amazon EMR cluster and used the script at:

  s3://support.elasticmapreduce/spark/install-spark

  to install Spark (this script was upgraded to support 1.2).

  I know there are tools to launch a Spark cluster in EC2, but I want to
 use EMR.

  Everything installs fine; however, when I go to read from a Parquet
 file, I end up with (the main exception):

  Caused by: java.lang.NoSuchMethodError:
 parquet.hadoop.ParquetInputSplit.init(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V
 at
 parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578)
 ... 55 more

  It seems to me like a version mismatch somewhere.  Where is the
 parquet-hadoop jar coming from?  Is it built into a fat jar for Spark?

  Any help would be appreciated.  Note that 1.1.1 worked fine with
 Parquet files.





Spark does not loop through a RDD.map

2015-01-12 Thread rkgurram
Hi,
   I am observing some weird behavior with spark, it might be my
mis-interpretation of some fundamental concepts but I have look at it for 3
days and have not been able to solve it.

The source code is pretty long and complex so instead of posting it, I will
try to articulate the problem.
I am building a Sentiment Analyser using the Naive Bayes model in Spark. 

1) I have taken text files in RAW format and created a RDD of
words-Array(files the word is found in). 

 2) From this I have derived the features array for each file which is an
Array[Double], a 0.0 if the file does not contain the word and 1.0 if the
word is found in the file

3) I have then created an RDD[LabeledPoints]

from this I have created the Naive Baiyes model using the following code
   
val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
   // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val test = splits(1)
Logger.info(Training count:  + training.count() +  Testing count: +
test.count())
model = NaiveBayes.train(training, lambda = 1.0)

val predictionAndLabel = test.map(p = (model.predict(p.features),
p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x = x._1 ==
x._2).count() / test.count()
Logger.info(Fold:[ + fold + ] accuracy: [ + accuracy +])

4) The model seems to be fine and the accuracy is about 75% to 82% depending
on which set of input fles I provide.

5) Now I am using this model to predict(),  here I am creating the same
feature array from the input text file and I have code as follows,
   /*
* Print all the features (words) in the feature array
*/
   allFeaturesRDD.foreach((x) = print(x + , ))

 /*
  * Build the feature array
  */

val features = buildFeatureArray(reviewID,wordSeqRdd)  Fails here,
have show this code below
logFeatureArray(features)

val prediction = model.predict(Vectors.dense(features))
Logger.info (Prediction: + prediction)

==
reviewID  filename
wordReviewSeqRDD - RDD[(word, Array(filename)]

  def buildFeatureArray(reviewID:String,
wordReviewSeqRDD:RDD[(String,Seq[String])]):
Array[Double] = {

val numWords = allFeaturesRDD.count --- number of all words in the
feature
val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap()

var featArray:Array[Double] = new Array(numWords.toInt) --- create an
empty features array
var idx = 0
if (trainingDone) Logger.info(Feature len: + numWords)

allFeaturesRDD.map{ *-- This is where it is failing, *
  case(eachword) = { *-- for some reason the code does not enter here
*
val reviewList = wordReviewSeqMap.get(eachword).get

if (trainingDone == true) {
  println(1. eachword: + eachword + reviewList: + reviewList)
  println(2. reviewList.size: + reviewList.length)
  println(3. reviewList(0): + reviewList(0))

}

featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else
0.toDouble
idx += 1
  }
}
featArray
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed to save RDD as text file to local file system

2015-01-12 Thread Sean Owen
I think you're confusing HDFS paths and local paths. You are cd'ing to
a directory and seem to want to write output there, but your path has
no scheme and defaults to being an HDFS path. When you use file: you
seem to have a permission error (perhaps).

On Mon, Jan 12, 2015 at 4:21 PM, NingjunWang
ningjun.w...@lexisnexis.com wrote:
 Prannoy



 I tried this r.saveAsTextFile(home/cloudera/tmp/out1), it return without
 error. But where does it saved to? The folder “/home/cloudera/tmp/out1” is
 not cretaed.



 I also tried the following

 cd /home/cloudera/tmp/

 spark-shell

 scala val r = sc.parallelize(Array(a, b, c))

 scala r.saveAsTextFile(out1)



 It does not return error. But still there is no “out1” folder created under
 /home/cloudera/tmp/



 I tried to give absolute path but then get an error



 scala r.saveAsTextFile(/home/cloudera/tmp/out1)

 org.apache.hadoop.security.AccessControlException: Permission denied:
 user=cloudera, access=WRITE, inode=/:hdfs:supergroup:drwxr-xr-x

 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)

 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)

 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)

 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)

 at
 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)

 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6286)

 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6268)

 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6220)

 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4087)

 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057)

 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030)

 at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787)

 at
 org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)

 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)

 at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)

 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)

 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)

 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:415)

 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)

 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)



 Very frustrated. Please advise.





 Regards,



 Ningjun Wang

 Consulting Software Engineer

 LexisNexis

 121 Chanlon Road

 New Providence, NJ 07974-1541



 From: Prannoy [via Apache Spark User List] [mailto:ml-node+[hidden email]]
 Sent: Monday, January 12, 2015 4:18 AM


 To: Wang, Ningjun (LNG-NPV)
 Subject: Re: Failed to save RDD as text file to local file system



 Have you tried simple giving the path where you want to save the file ?



 For instance in your case just do



 r.saveAsTextFile(home/cloudera/tmp/out1)



 Dont use file



 This will create a folder with name out1. saveAsTextFile always write by
 making a directory, it does not write data into a single file.



 Incase you need a single file you can use copyMerge API in FileUtils.



 FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 ,
 true, conf,null);

 Now out2 will be a single file containing your data.

 fs is the configuration of you local file system.

 Thanks





 On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List]
 [hidden email] wrote:

 No, do you have any idea?



 Regards,



 Ningjun Wang

 Consulting Software Engineer

 LexisNexis

 121 Chanlon Road

 New Providence, NJ 07974-1541



 From: firemonk9 [via Apache Spark User List] [mailto:[hidden email][hidden
 email]]
 Sent: Friday, January 09, 2015 2:56 PM
 To: Wang, Ningjun (LNG-NPV)
 Subject: Re: Failed to save RDD as text file to local file system



 Have you found any resolution for this issue 

Re: How to use memcached with spark

2015-01-12 Thread octavian.ganea
I am trying to use it, but without success. Any sample code that works with
Spark would be highly appreciated. :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-memcached-with-spark-tp13409p21103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark executors resources. Blocking?

2015-01-12 Thread Luis Guerra
Hello all,

I have a naive question regarding how spark uses the executors in a cluster
of machines. Imagine the scenario in which I do not know the input size of
my data in execution A, so I set Spark to use 20 (out of my 25 nodes, for
instance). At the same time, I also launch a second execution B, setting
Spark to use 10 nodes for this.

Assuming a huge input size for execution A, which implies an execution time
of 30 minutes for example (using all the resources), and a constant
execution time for B of 10 minutes, then both executions will last for 40
minutes (I assume that B cannot be launched until 10 resources are
completely available, when A finishes).

Now, assuming a very small input size for execution A running for 5 minutes
in only 2 of the 20 planned resources, I would like execution B to be
launched at that time, consuming both executions only 10 minutes (and 12
resources). However, as execution A has set Spark to use 20 resources,
execution B has to wait until A has finished, so the total execution time
lasts for 15 minutes.

Is this right? If so, how can I solve this kind of scenarios? If I am
wrong, what would be the correct interpretation for this?

Thanks in advance,

Best


Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread David McWhorter
Hi Ganelin, sorry if it wasn't clear from my previous email, but that is 
how I am creating a spark context.  I just didn't write out the lines 
where I create the new SparkConf and SparkContext.  I am also upping the 
driver memory when running.


Thanks,
David

On 01/12/2015 11:12 AM, Ganelin, Ilya wrote:

There are two related options:

To solve your problem directly try:
valconf =newSparkConf().set(spark.yarn.driver.memoryOverhead,1024)
valsc =newSparkContext(conf)
And the second, which increases the overall memory available on the driver, as 
part of your spark-submit script add:
--driver-memory 2g
Hope this helps!


From: David McWhorter mcwhor...@ccri.com mailto:mcwhor...@ccri.com
Date: Monday, January 12, 2015 at 11:01 AM
To: user@spark.apache.org mailto:user@spark.apache.org 
user@spark.apache.org mailto:user@spark.apache.org

Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

Hi all,

I'm trying to figure out how to set this option:  
spark.yarn.driver.memoryOverhead on Spark 1.2.0.  I found this 
helpful overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, 
which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 
added to spark-submit.  However, when I do that I get this error:

Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling 
sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark 
configuration object but I still get Will allocate AM container, with 
 MB memory including 384 MB overhead when launching.  I'm running 
in yarn-cluster mode.


Any help or tips would be appreciated.

Thanks,
David
--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com  | 434.299.0090x204



The information contained in this e-mail is confidential and/or 
proprietary to Capital One and/or its affiliates. The information 
transmitted herewith is intended only for use by the individual or 
entity to which it is addressed.  If the reader of this message is not 
the intended recipient, you are hereby notified that any review, 
retransmission, dissemination, distribution, copying or other use of, 
or taking of any action in reliance upon this information is strictly 
prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.




--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com | 434.299.0090x204



Re: Spark does not loop through a RDD.map

2015-01-12 Thread Cody Koeninger
At a quick glance, I think you're misunderstanding some basic features.

http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

Map is a transformation, it is lazy.  You're not calling any action on the
result of map.

Also, closing over a mutable variable (like idx or featArray here) won't
work; that closure is being run on executors, not the driver where your
main code is running.

On Mon, Jan 12, 2015 at 9:49 AM, rkgurram rkgur...@gmail.com wrote:

 Hi,
I am observing some weird behavior with spark, it might be my
 mis-interpretation of some fundamental concepts but I have look at it for 3
 days and have not been able to solve it.

 The source code is pretty long and complex so instead of posting it, I will
 try to articulate the problem.
 I am building a Sentiment Analyser using the Naive Bayes model in Spark.

 1) I have taken text files in RAW format and created a RDD of
 words-Array(files the word is found in).

  2) From this I have derived the features array for each file which is an
 Array[Double], a 0.0 if the file does not contain the word and 1.0 if the
 word is found in the file

 3) I have then created an RDD[LabeledPoints]

 from this I have created the Naive Baiyes model using the following code

 val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
 val training = splits(0)
// training.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
 val test = splits(1)
 Logger.info(Training count:  + training.count() +  Testing count: +
 test.count())
 model = NaiveBayes.train(training, lambda = 1.0)

 val predictionAndLabel = test.map(p = (model.predict(p.features),
 p.label))
 val accuracy = 1.0 * predictionAndLabel.filter(x = x._1 ==
 x._2).count() / test.count()
 Logger.info(Fold:[ + fold + ] accuracy: [ + accuracy +])

 4) The model seems to be fine and the accuracy is about 75% to 82%
 depending
 on which set of input fles I provide.

 5) Now I am using this model to predict(),  here I am creating the same
 feature array from the input text file and I have code as follows,
/*
 * Print all the features (words) in the feature array
 */
allFeaturesRDD.foreach((x) = print(x + , ))

  /*
   * Build the feature array
   */

 val features = buildFeatureArray(reviewID,wordSeqRdd)  Fails here,
 have show this code below
 logFeatureArray(features)

 val prediction = model.predict(Vectors.dense(features))
 Logger.info (Prediction: + prediction)

 ==
 reviewID  filename
 wordReviewSeqRDD - RDD[(word, Array(filename)]

   def buildFeatureArray(reviewID:String,
 wordReviewSeqRDD:RDD[(String,Seq[String])]):
 Array[Double] = {

 val numWords = allFeaturesRDD.count --- number of all words in the
 feature
 val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap()

 var featArray:Array[Double] = new Array(numWords.toInt) --- create an
 empty features array
 var idx = 0
 if (trainingDone) Logger.info(Feature len: + numWords)

 allFeaturesRDD.map{ *-- This is where it is failing, *
   case(eachword) = { *-- for some reason the code does not enter here
 *
 val reviewList = wordReviewSeqMap.get(eachword).get

 if (trainingDone == true) {
   println(1. eachword: + eachword + reviewList: + reviewList)
   println(2. reviewList.size: + reviewList.length)
   println(3. reviewList(0): + reviewList(0))

 }

 featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else
 0.toDouble
 idx += 1
   }
 }
 featArray
   }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread Sean Owen
Isn't the syntax --conf property=value?

http://spark.apache.org/docs/latest/configuration.html

Yes, I think setting it after the driver is running is of course too late.

On Mon, Jan 12, 2015 at 4:01 PM, David McWhorter mcwhor...@ccri.com wrote:
 Hi all,

 I'm trying to figure out how to set this option: 
 spark.yarn.driver.memoryOverhead on Spark 1.2.0.  I found this helpful
 overview
 http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476,
 which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added
 to spark-submit.  However, when I do that I get this error:
 Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
 Run with --help for usage help or --verbose for debug output
 I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead,
 1024) on my spark configuration object but I still get Will allocate AM
 container, with  MB memory including 384 MB overhead when launching.
 I'm running in yarn-cluster mode.

 Any help or tips would be appreciated.

 Thanks,
 David

 --

 David McWhorter
 Software Engineer
 Commonwealth Computer Research, Inc.
 1422 Sachem Place, Unit #1
 Charlottesville, VA 22901
 mcwhor...@ccri.com | 434.299.0090x204

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RowMatrix multiplication

2015-01-12 Thread Alex Minnaar
That's not quite what I'm looking for.  Let me provide an example.  I have a 
rowmatrix A that is nxm and I have two local matrices b and c.  b is mx1 and c 
is nx1.  In my spark job I wish to perform the following two computations


A*b


and


A^T*c


I don't think this is possible without being able to transpose a rowmatrix.  Am 
I correct?


Thanks,


Alex


From: Reza Zadeh r...@databricks.com
Sent: Monday, January 12, 2015 1:58 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication

As you mentioned, you can perform A * b, where A is a rowmatrix and b is a 
local matrix.

From your email, I figure you want to compute b * A^T. To do this, you can 
compute C = A b^T, whose result is the transpose of what you were looking for, 
i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually 
yourself. Be careful though, because the result might not have each Row fit in 
memory on a single machine, which is what RowMatrix requires. This danger is 
why we didn't provide a transpose operation in RowMatrix natively.

To address this and more, there is an effort to provide more comprehensive 
linear algebra through block matrices, which will likely make it to 1.3:
https://issues.apache.org/jira/browse/SPARK-3434

Best,
Reza

On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar 
aminn...@verticalscope.commailto:aminn...@verticalscope.com wrote:

I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?


Thanks,


Alex



Re: Getting Output From a Cluster

2015-01-12 Thread Su She
Hello Everyone,

Quick followup, is there any way I can append output to one file rather
then create a new directory/file every X milliseconds?

Thanks!

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Thu, Jan 8, 2015 at 11:41 PM, Su She suhsheka...@gmail.com wrote:

 1) Thank you everyone for the help once again...the support here is really
 amazing and I hope to contribute soon!

 2) The solution I actually ended up using was from this thread:
 http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E

 in case the thread ever goes down, the soln provided by Matei:

 plans.saveAsHadoopFiles(hdfs://localhost:8020/user/hue/output/completed,csv,
 String.class, String.class, (Class) TextOutputFormat.class);

 I had browsed a lot of similar threads that did not have answers, but
 found this one from quite some time ago, so apologize for posting a
 question that had been answered before.

 3) Akhil, I was specifying the format as txt, but it was not compatible

 Thanks for the help!


 On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 saveAsHadoopFiles requires you to specify the output format which i
 believe you are not specifying anywhere and hence the program crashes.

 You could try something like this:

 Class? extends OutputFormat?,? outputFormatClass = (Class? extends
 OutputFormat?,?) (Class?) SequenceFileOutputFormat.class;
 46

 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
 /output-location,Text.class, Text.class, outputFormatClass);



 Thanks
 Best Regards

 On Fri, Jan 9, 2015 at 10:22 AM, Su She suhsheka...@gmail.com wrote:

 Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I
 call print on the Dstream it works? If I had to do foreachRDD to
 saveAsHadoopFile, then why is it working for print?

 Also, if I am doing foreachRDD, do I need connections, or can I simply
 put the saveAsHadoopFiles inside the foreachRDD function?

 Thanks Yana for the help! I will play around with foreachRDD and convey
 my results.



 On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 are you calling the saveAsText files on the DStream --looks like it?
 Look at the section called Design Patterns for using foreachRDD in the
 link you sent -- you want to do  dstream.foreachRDD(rdd =
 rdd.saveAs)

 On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Thanks in advance for the help!

 I successfully got my Kafka/Spark WordCount app to print locally.
 However, I want to run it on a cluster, which means that I will have to
 save it to HDFS if I want to be able to read the output.

 I am running Spark 1.1.0, which means according to this document:
 https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html

 I should be able to use commands such as saveAsText/HadoopFiles.

 1) When I try saveAsTextFiles it says:
 cannot find symbol
 [ERROR] symbol  : method
 saveAsTextFiles(java.lang.String,java.lang.String)
 [ERROR] location: class
 org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer

 This makes some sense as saveAsTextFiles is not included here:

 http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html

 2) When I try
 saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount,
 txt) it builds, but when I try running it it throws this exception:

 Exception in thread main java.lang.RuntimeException:
 java.lang.RuntimeException: class scala.runtime.Nothing$ not
 org.apache.hadoop.mapred.OutputFormat
 at
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
 at
 org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 

Re: /tmp directory fills up

2015-01-12 Thread Marcelo Vanzin
Hi Alessandro,

You can look for a log line like this in your driver's output:
15/01/12 10:51:01 INFO storage.DiskBlockManager: Created local
directory at 
/data/yarn/nm/usercache/systest/appcache/application_1421081007635_0002/spark-local-20150112105101-4f3d

If you're deploying your application in cluster mode, the temp
directory will be under the Yarn-defined application dir. In client
mode, the driver will create some stuff under spark.local.dir, but the
driver itself generally doesn't create many temp files IIRC.


On Fri, Jan 9, 2015 at 11:32 PM, Alessandro Baretta
alexbare...@gmail.com wrote:
 Gents,

 I'm building spark using the current master branch and deploying in to
 Google Compute Engine on top of Hadoop 2.4/YARN via bdutil, Google's Hadoop
 cluster provisioning tool. bdutils configures Spark with

 spark.local.dir=/hadoop/spark/tmp,

 but this option is ignored in combination with YARN. Bdutils also configures
 YARN with:

   property
 nameyarn.nodemanager.local-dirs/name
 value/mnt/pd1/hadoop/yarn/nm-local-dir/value
 description
   Directories on the local machine in which to application temp files.
 /description
   /property

 This is the right directory for spark to store temporary data in. Still,
 Spark is creating such directories as this:

 /tmp/spark-51388ee6-9de6-411d-b9b9-ab6f9502d01e

 and filling them up with gigabytes worth of output files, filling up the
 very small root filesystem.

 How can I diagnose why my Spark installation is not picking up the
 yarn.nodemanager.local-dirs from yarn?

 Alex



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ReliableKafkaReceiver stopped receiving data after WriteAheadLogBasedBlockHandler throws TimeoutException

2015-01-12 Thread Max Xu
Hi all,

I am running a Spark streaming application with ReliableKafkaReceiver (Spark 
1.2.0). Constantly I was getting the following exception:

15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)

After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped 
receiving data from Kafka. The Kafka message handler thread is in BLOCKED state:

Thread 92: KafkaMessageHandler-0 (BLOCKED)
org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
java.util.concurrent.FutureTask.run(FutureTask.java:262)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Sometimes when the exception was thrown, I also see warning messages like this:
15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 
30533ms (threshold=3ms); ack: seqno: 113 status: SUCCESS status: SUCCESS 
downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 
172.20.xxx.xxx:50010]
15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms 
(threshold=3ms)

In the past, I never have such problem with KafkaReceiver. What causes this 
exception? How can I solve this problem?

Thanks in advance,
Max


Re: RowMatrix multiplication

2015-01-12 Thread Alex Minnaar
?Good idea! Join each element of c with the corresponding row of A, multiply 
through, then reduce.  I'll give this a try.


Thanks,


Alex


From: Reza Zadeh r...@databricks.com
Sent: Monday, January 12, 2015 3:05 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication

Yes you are correct, to do it with existing operations you would need a 
transpose on rowmatrix.

However, you can fairly easily perform the operation manually by doing a join 
(if the c vector is an RDD) or broadcasting c (if the c vector is small enough 
to fit in memory on a single machine).

On Mon, Jan 12, 2015 at 11:45 AM, Alex Minnaar 
aminn...@verticalscope.commailto:aminn...@verticalscope.com wrote:

That's not quite what I'm looking for.  Let me provide an example.  I have a 
rowmatrix A that is nxm and I have two local matrices b and c.  b is mx1 and c 
is nx1.  In my spark job I wish to perform the following two computations


A*b


and


A^T*c


I don't think this is possible without being able to transpose a rowmatrix.  Am 
I correct?


Thanks,


Alex


From: Reza Zadeh r...@databricks.commailto:r...@databricks.com
Sent: Monday, January 12, 2015 1:58 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication

As you mentioned, you can perform A * b, where A is a rowmatrix and b is a 
local matrix.

From your email, I figure you want to compute b * A^T. To do this, you can 
compute C = A b^T, whose result is the transpose of what you were looking for, 
i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually 
yourself. Be careful though, because the result might not have each Row fit in 
memory on a single machine, which is what RowMatrix requires. This danger is 
why we didn't provide a transpose operation in RowMatrix natively.

To address this and more, there is an effort to provide more comprehensive 
linear algebra through block matrices, which will likely make it to 1.3:
https://issues.apache.org/jira/browse/SPARK-3434

Best,
Reza

On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar 
aminn...@verticalscope.commailto:aminn...@verticalscope.com wrote:

I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?


Thanks,


Alex




Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread ChongTang
Is there any body can help me with this? Thank you very much!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



no snappyjava in java.library.path

2015-01-12 Thread Dan Dong
Hi,
  My Spark job failed with no snappyjava in java.library.path as:
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1119)
at
org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)

I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I
have included it in the
CLASSPATH already.
../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar

Did I miss anything or I should set it in other way?

Cheers,
Dan


Re: no snappyjava in java.library.path

2015-01-12 Thread David Rosenstrauch
I ran into this recently.  Turned out we had an old 
org-xerial-snappy.properties file in one of our conf directories that 
had the setting:


# Disables loading Snappy-Java native library bundled in the
# snappy-java-*.jar file forcing to load the Snappy-Java native
# library from the java.library.path.
#
org.xerial.snappy.disable.bundled.libs=true

When I switched that to false, it made the problem go away.

May or may not be your problem of course, but worth a look.

HTH,

DR

On 01/12/2015 03:28 PM, Dan Dong wrote:

Hi,
   My Spark job failed with no snappyjava in java.library.path as:
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in
java.library.path
 at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857)
 at java.lang.Runtime.loadLibrary0(Runtime.java:870)
 at java.lang.System.loadLibrary(System.java:1119)
 at
org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)

I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I
have included it in the
CLASSPATH already.
../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar

Did I miss anything or I should set it in other way?

Cheers,
Dan




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Web Service + Spark

2015-01-12 Thread Robert C Senkbeil
If you would like to work with an API, you can use the Spark Kernel found
here: https://github.com/ibm-et/spark-kernel

The kernel provides an API following the IPython message protocol as well
as a client library that can be used with Scala applications.

The kernel can also be plugged into the latest developmental version of
IPython 3.0 in case you want to do more visual exploration.

Signed,
Chip Senkbeil
IBM Emerging Technology Software Engineer



From:   Raghavendra Pandey raghavendra.pan...@gmail.com
To: Cui Lin cui@hds.com, gtinside gtins...@gmail.com, Corey
Nolet cjno...@gmail.com
Cc: user@spark.apache.org user@spark.apache.org
Date:   01/11/2015 02:06 AM
Subject:Re: Web Service + Spark



You can take a look at http://zeppelin.incubator.apache.org. it is a
notebook and graphic visual designer.



On Sun, Jan 11, 2015, 01:45 Cui Lin cui@hds.com wrote:
  Thanks, Gaurav and Corey,

  Probably I didn’t make myself clear. I am looking for best Spark practice
  similar to Shiny for R, the analysis/visualziation results can be easily
  published to web server and shown from web browser. Or any dashboard for
  Spark?

  Best regards,

  Cui Lin

  From: gtinside gtins...@gmail.com
  Date: Friday, January 9, 2015 at 7:45 PM
  To: Corey Nolet cjno...@gmail.com
  Cc: Cui Lin cui@hds.com, user@spark.apache.org 
  user@spark.apache.org
  Subject: Re: Web Service + Spark

  You can also look at Spark Job Server
  https://github.com/spark-jobserver/spark-jobserver

  - Gaurav

  On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote:

Cui Lin,

The solution largely depends on how you want your services deployed
(Java web container, Spray framework, etc...) and if you are using
a cluster manager like Yarn or Mesos vs. just firing up your own
executors and master.

I recently worked on an example for deploying Spark services inside
of Jetty using Yarn as the cluster manager. It forced me to learn
how Spark wires up the dependencies/classpaths. If it helps, the
example that resulted from my tinkering is located at [1].


[1] https://github.com/calrissian/spark-jetty-server

On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote:
 Hello, All,

 What’s the best practice on deploying/publishing spark-based
 scientific applications into a web service? Similar to Shiny on R.
  Thanks!

 Best regards,

 Cui Lin


Spark Framework handling of Mesos master change

2015-01-12 Thread Ethan Wolf
We are running Spark and Spark Streaming on Mesos (with multiple masters for
HA).
At launch, our Spark jobs successfully look up the current Mesos master from
zookeeper and spawn tasks.

However, when the Mesos master changes while the spark job is executing, the
spark driver seems to interact with the old Mesos master, and therefore
fails to launch any new tasks.
We are running long running Spark streaming jobs, so we have temporarily
switched to coarse grained as a work around, but it prevents us from running
in fine grained mode which we would prefer for some job.

Looking at the code for MesosSchedulerBackend, it it has an empty
implementation of the reregistered (and disconnected) methods, which I
believe would be called when the master changes:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L202

http://mesos.apache.org/documentation/latest/app-framework-development-guide/

Are there any plans to implement master reregistration in the Spark
framework, or does anyone have any suggested workarounds for long running
jobs to deal with the mesos master changing?  (Or is there something we are
doing wrong?)

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Framework-handling-of-Mesos-master-change-tp21107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to recovery application running records when I restart Spark master?

2015-01-12 Thread Chong Tang
Hi all,

Due to some reasons, I restarted Spark master node.

Before I restart it, there were some application running records at the
bottom of the master web page. But they are gone after I restart the master
node. The records include application name, running time, status, and so
on. I am sure you know what I am talking about.

My question is: 1) how can I recovery those records when I restart my Spark
master node. 2) If I cannot recovery it, where should I go to look for
them?

Actually, I am caring about the running time of finished applications.

Thank you for your help, and hope every body is doing great!

Chong


Re: Getting Output From a Cluster

2015-01-12 Thread Akhil Das
There is no direct way of doing that. If you need a Single file for every
batch duration, then you can repartition the data to 1 before saving.
Another way would be to use hadoop's copy merge command/api(available from
2.0 versions)
On 13 Jan 2015 01:08, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Quick followup, is there any way I can append output to one file rather
 then create a new directory/file every X milliseconds?

 Thanks!

 Suhas Shekar

 University of California, Los Angeles
 B.A. Economics, Specialization in Computing 2014

 On Thu, Jan 8, 2015 at 11:41 PM, Su She suhsheka...@gmail.com wrote:

 1) Thank you everyone for the help once again...the support here is
 really amazing and I hope to contribute soon!

 2) The solution I actually ended up using was from this thread:
 http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E

 in case the thread ever goes down, the soln provided by Matei:

 plans.saveAsHadoopFiles(hdfs://localhost:8020/user/hue/output/completed,csv,
 String.class, String.class, (Class) TextOutputFormat.class);

 I had browsed a lot of similar threads that did not have answers, but
 found this one from quite some time ago, so apologize for posting a
 question that had been answered before.

 3) Akhil, I was specifying the format as txt, but it was not compatible

 Thanks for the help!


 On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 saveAsHadoopFiles requires you to specify the output format which i
 believe you are not specifying anywhere and hence the program crashes.

 You could try something like this:

 Class? extends OutputFormat?,? outputFormatClass = (Class? extends
 OutputFormat?,?) (Class?) SequenceFileOutputFormat.class;
 46

 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
 /output-location,Text.class, Text.class, outputFormatClass);



 Thanks
 Best Regards

 On Fri, Jan 9, 2015 at 10:22 AM, Su She suhsheka...@gmail.com wrote:

 Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I
 call print on the Dstream it works? If I had to do foreachRDD to
 saveAsHadoopFile, then why is it working for print?

 Also, if I am doing foreachRDD, do I need connections, or can I simply
 put the saveAsHadoopFiles inside the foreachRDD function?

 Thanks Yana for the help! I will play around with foreachRDD and convey
 my results.



 On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 are you calling the saveAsText files on the DStream --looks like it?
 Look at the section called Design Patterns for using foreachRDD in the
 link you sent -- you want to do  dstream.foreachRDD(rdd =
 rdd.saveAs)

 On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Thanks in advance for the help!

 I successfully got my Kafka/Spark WordCount app to print locally.
 However, I want to run it on a cluster, which means that I will have to
 save it to HDFS if I want to be able to read the output.

 I am running Spark 1.1.0, which means according to this document:
 https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html

 I should be able to use commands such as saveAsText/HadoopFiles.

 1) When I try saveAsTextFiles it says:
 cannot find symbol
 [ERROR] symbol  : method
 saveAsTextFiles(java.lang.String,java.lang.String)
 [ERROR] location: class
 org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer

 This makes some sense as saveAsTextFiles is not included here:

 http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html

 2) When I try
 saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount,
 txt) it builds, but when I try running it it throws this exception:

 Exception in thread main java.lang.RuntimeException:
 java.lang.RuntimeException: class scala.runtime.Nothing$ not
 org.apache.hadoop.mapred.OutputFormat
 at
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
 at
 org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)

Re: Getting Output From a Cluster

2015-01-12 Thread Su She
Okay, thanks Akhil!

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Jan 12, 2015 at 1:24 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 There is no direct way of doing that. If you need a Single file for every
 batch duration, then you can repartition the data to 1 before saving.
 Another way would be to use hadoop's copy merge command/api(available from
 2.0 versions)
 On 13 Jan 2015 01:08, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Quick followup, is there any way I can append output to one file rather
 then create a new directory/file every X milliseconds?

 Thanks!

 Suhas Shekar

 University of California, Los Angeles
 B.A. Economics, Specialization in Computing 2014

 On Thu, Jan 8, 2015 at 11:41 PM, Su She suhsheka...@gmail.com wrote:

 1) Thank you everyone for the help once again...the support here is
 really amazing and I hope to contribute soon!

 2) The solution I actually ended up using was from this thread:
 http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E

 in case the thread ever goes down, the soln provided by Matei:


 plans.saveAsHadoopFiles(hdfs://localhost:8020/user/hue/output/completed,csv,
 String.class, String.class, (Class) TextOutputFormat.class);

 I had browsed a lot of similar threads that did not have answers, but
 found this one from quite some time ago, so apologize for posting a
 question that had been answered before.

 3) Akhil, I was specifying the format as txt, but it was not
 compatible

 Thanks for the help!


 On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 saveAsHadoopFiles requires you to specify the output format which i
 believe you are not specifying anywhere and hence the program crashes.

 You could try something like this:

 Class? extends OutputFormat?,? outputFormatClass = (Class? extends
 OutputFormat?,?) (Class?) SequenceFileOutputFormat.class;
 46

 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
 /output-location,Text.class, Text.class, outputFormatClass);



 Thanks
 Best Regards

 On Fri, Jan 9, 2015 at 10:22 AM, Su She suhsheka...@gmail.com wrote:

 Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when
 I call print on the Dstream it works? If I had to do foreachRDD to
 saveAsHadoopFile, then why is it working for print?

 Also, if I am doing foreachRDD, do I need connections, or can I simply
 put the saveAsHadoopFiles inside the foreachRDD function?

 Thanks Yana for the help! I will play around with foreachRDD and
 convey my results.



 On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska yana.kadiy...@gmail.com
  wrote:

 are you calling the saveAsText files on the DStream --looks like it?
 Look at the section called Design Patterns for using foreachRDD in the
 link you sent -- you want to do  dstream.foreachRDD(rdd =
 rdd.saveAs)

 On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Thanks in advance for the help!

 I successfully got my Kafka/Spark WordCount app to print locally.
 However, I want to run it on a cluster, which means that I will have to
 save it to HDFS if I want to be able to read the output.

 I am running Spark 1.1.0, which means according to this document:
 https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html

 I should be able to use commands such as saveAsText/HadoopFiles.

 1) When I try saveAsTextFiles it says:
 cannot find symbol
 [ERROR] symbol  : method
 saveAsTextFiles(java.lang.String,java.lang.String)
 [ERROR] location: class
 org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer

 This makes some sense as saveAsTextFiles is not included here:

 http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html

 2) When I try
 saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount,
 txt) it builds, but when I try running it it throws this exception:

 Exception in thread main java.lang.RuntimeException:
 java.lang.RuntimeException: class scala.runtime.Nothing$ not
 org.apache.hadoop.mapred.OutputFormat
 at
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
 at
 org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
 at
 

Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/monitoring.html

http://spark.apache.org/docs/latest/configuration.html#spark-ui

spark.eventLog.enabled



On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote:

 Is there any body can help me with this? Thank you very much!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Framework handling of Mesos master change

2015-01-12 Thread Tim Chen
Hi Ethan,

How are you specifying the master to spark?

Able to recover from master failover is already handled by the underlying
Mesos scheduler, but you have to use zookeeper instead of directly passing
in the master uris.

Tim

On Mon, Jan 12, 2015 at 12:44 PM, Ethan Wolf ethan.w...@alum.mit.edu
wrote:

 We are running Spark and Spark Streaming on Mesos (with multiple masters
 for
 HA).
 At launch, our Spark jobs successfully look up the current Mesos master
 from
 zookeeper and spawn tasks.

 However, when the Mesos master changes while the spark job is executing,
 the
 spark driver seems to interact with the old Mesos master, and therefore
 fails to launch any new tasks.
 We are running long running Spark streaming jobs, so we have temporarily
 switched to coarse grained as a work around, but it prevents us from
 running
 in fine grained mode which we would prefer for some job.

 Looking at the code for MesosSchedulerBackend, it it has an empty
 implementation of the reregistered (and disconnected) methods, which I
 believe would be called when the master changes:

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L202


 http://mesos.apache.org/documentation/latest/app-framework-development-guide/

 Are there any plans to implement master reregistration in the Spark
 framework, or does anyone have any suggested workarounds for long running
 jobs to deal with the mesos master changing?  (Or is there something we are
 doing wrong?)

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Framework-handling-of-Mesos-master-change-tp21107.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Chong Tang
Thank you, Cody! Actually, I have enabled this option, and I saved logs
into Hadoop file system. The problem is, how can I get the duration of an
application? The attached file is the log I copied from HDFS.

On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org wrote:

 http://spark.apache.org/docs/latest/monitoring.html

 http://spark.apache.org/docs/latest/configuration.html#spark-ui

 spark.eventLog.enabled



 On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote:

 Is there any body can help me with this? Thank you very much!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





EVENT_LOG_1
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Chong Tang
Thank you, Cody! Actually, I have enabled this option, and I saved logs
into Hadoop file system. The problem is, how can I get the duration of an
application? The attached file is the log I copied from HDFS.

On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org wrote:

 http://spark.apache.org/docs/latest/monitoring.html

 http://spark.apache.org/docs/latest/configuration.html#spark-ui

 spark.eventLog.enabled



 On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote:

 Is there any body can help me with this? Thank you very much!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





EVENT_LOG_1
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Discrepancy in PCA values

2015-01-12 Thread Xiangrui Meng
Could you compare V directly and tell us more about the difference you
saw? The column of V should be the same subject to signs. For example,
the first column of V could be either [0.8, -0.6, 0.0] or [-0.8, 0.6,
0.0]. -Xiangrui

On Sat, Jan 10, 2015 at 8:08 PM, Upul Bandara upulband...@gmail.com wrote:
 Hi Xiangrui,

 Thanks a lot for you answer.
 So I fixed my Julia code, also calculated PCA using R as well.

 R Code:
 -
 data - read.csv('/home/upul/Desktop/iris.csv');
 X - data[,1:4]
 pca - prcomp(X, center = TRUE, scale=FALSE)
 transformed - predict(pca, newdata = X)

 Julia Code (Fixed)
 --
 data = readcsv(/home/upul/temp/iris.csv);
 X = data[:,1:end-1];
 meanX = mean(X,1);
 m,n = size(X);
 X = X - repmat(x, m,1);
 u,s,v = svd(X);
 transformed =  X*v;

 Now PCA calculated using Julia and R is identical, but still I can see a
 small
 difference between PCA  values given by Spark and other two.

 Thanks,
 Upul

 On Sat, Jan 10, 2015 at 11:17 AM, Xiangrui Meng men...@gmail.com wrote:

 You need to subtract mean values to obtain the covariance matrix
 (http://en.wikipedia.org/wiki/Covariance_matrix).

 On Fri, Jan 9, 2015 at 6:41 PM, Upul Bandara upulband...@gmail.com
 wrote:
  Hi Xiangrui,
 
  Thanks for the reply.
 
  Julia code is also using the covariance matrix:
  (1/n)*X'*X ;
 
  Thanks,
  Upul
 
  On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng men...@gmail.com wrote:
 
  The Julia code is computing the SVD of the Gram matrix. PCA should be
  applied to the covariance matrix. -Xiangrui
 
  On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara upulband...@gmail.com
  wrote:
   Hi All,
  
   I tried to do PCA for the Iris dataset
   [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib
  
  
   [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html].
   Also, PCA  was calculated in Julia using following method:
  
   Sigma = (1/numRow(X))*X'*X ;
   [U, S, V] = svd(Sigma);
   Ureduced = U(:, 1:k);
   Z = X*Ureduced;
  
   However, I'm seeing a little difference between values given by MLLib
   and
   the method shown above .
  
   Does anyone have any idea about this difference?
  
   Additionally, I have attached two visualizations, related to two
   approaches.
  
   Thanks,
   Upul
  
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How does unmanaged memory work with the executor memory limits?

2015-01-12 Thread Michael Albert
Greetings!
My executors apparently are being terminated because they are running beyond 
physical memory limits according to the yarn-hadoop-nodemanager logs on the 
worker nodes (/mnt/var/log/hadoop on AWS EMR).  I'm setting the driver-memory 
to 8G.However, looking at stdout in userlogs, I can see GC going on, but the 
lines looklike 6G - 5G(7.2G), 0.45secs, so the GC seems to think that the 
process is usingabout 6G of space, not 8G of space.  However, ps aux shows an 
RSS hovering just below 8G.
The process does a mapParitionsWithIndex, and the process uses 
compressionwhich (I believe) calls into the native zlib library (the overall 
purpose is to convert each partition into a matlab file).
Could it be that the Yarn container is counting both the memory used by the JVM 
proper and memory used by zlib, but that the GC only sees the internal 
memory.  So the GC keeps the memory usage reasonable, e.g., 6G in an 8G 
container, but then zlib grabs some memory, and the YARN container then 
terminates the task?
If so, is there anything I can do so that I tell YARN to watch for a 
largermemory limit than I tell the JVM to use for it's memory?
Thanks!
Sincerely, Mike
 

Re: How does unmanaged memory work with the executor memory limits?

2015-01-12 Thread Marcelo Vanzin
Short answer: yes.

Take a look at: http://spark.apache.org/docs/latest/running-on-yarn.html

Look for memoryOverhead.

On Mon, Jan 12, 2015 at 2:06 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 My executors apparently are being terminated because they are
 running beyond physical memory limits according to the
 yarn-hadoop-nodemanager
 logs on the worker nodes (/mnt/var/log/hadoop on AWS EMR).
 I'm setting the driver-memory to 8G.
 However, looking at stdout in userlogs, I can see GC going on, but the
 lines look
 like 6G - 5G(7.2G), 0.45secs, so the GC seems to think that the process
 is using
 about 6G of space, not 8G of space.
 However, ps aux shows an RSS hovering just below 8G.

 The process does a mapParitionsWithIndex, and the process uses compression
 which (I believe) calls into the native zlib library
 (the overall purpose is to convert each partition into a matlab file).

 Could it be that the Yarn container is counting both the memory used by the
 JVM proper and memory used by zlib, but that the GC only sees the
 internal memory.  So the GC keeps the memory usage reasonable,
 e.g., 6G in an 8G container, but then zlib grabs some memory, and the
 YARN container then terminates the task?

 If so, is there anything I can do so that I tell YARN to watch for a larger
 memory limit than I tell the JVM to use for it's memory?

 Thanks!

 Sincerely,
  Mike





-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Cody Koeninger
Sorry, slightly misunderstood the question.  I'm not sure if there's a way
to make the master UI read old log files after a restart, but the log files
themselves are human readable text.

If you just want application duration, the start and stop are timestamped,
look for lines like this in EVENT_LOG_1:

{Event:SparkListenerApplicationStart,App
Name:cassandra-example-broadcast-join,Timestamp:1415763986601,User:cody}

...

{Event:SparkListenerApplicationEnd,Timestamp:1415763999790}



On Mon, Jan 12, 2015 at 3:56 PM, Chong Tang ct...@virginia.edu wrote:

 Thank you, Cody! Actually, I have enabled this option, and I saved logs
 into Hadoop file system. The problem is, how can I get the duration of an
 application? The attached file is the log I copied from HDFS.

 On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org
 wrote:

 http://spark.apache.org/docs/latest/monitoring.html

 http://spark.apache.org/docs/latest/configuration.html#spark-ui

 spark.eventLog.enabled



 On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote:

 Is there any body can help me with this? Thank you very much!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






including the spark-mllib in build.sbt

2015-01-12 Thread Jianguo Li
Hi,

I am trying to build my own scala project using sbt. The project is
dependent on both spark-score and spark-mllib. I included the following two
dependencies in my build.sbt file

libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1
libraryDependencies += org.apache.spark %% spark-core % 1.1.1

However, when I run the package command in sbt, I got an error message
indicating that object mllib is not a member of package org.apache.spark.

Did I do anything wrong?

Thanks,

Jianguo


How Spark Calculate partition size automatically

2015-01-12 Thread rajnish
Hi,

When I am running a job, that is loading the data from Cassandra, Spark has
created almost 9million partitions. How spark decide the partition count? I
have read from one of the presentation that it is good to have 1000 to
10,000 partitions.

Regards
Raj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-Calculate-partition-size-automatically-tp21109.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-12 Thread Xiangrui Meng
No, colStats() computes all summary statistics in one pass and store
the values. It is not lazy.

On Mon, Jan 12, 2015 at 4:42 AM, Rok Roskar rokros...@gmail.com wrote:
 This was without using Kryo -- if I use kryo, I got errors about buffer
 overflows (see above):

 com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
 required: 8

 Just calling colStats doesn't actually compute those statistics, does it? It
 looks like the computation is only carried out once you call the .mean()
 method.



 On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng men...@gmail.com wrote:

 colStats() computes the mean values along with several other summary
 statistics, which makes it slower. How is the performance if you don't
 use kryo? -Xiangrui

 On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote:
  thanks for the suggestion -- however, looks like this is even slower.
  With
  the small data set I'm using, my aggregate function takes ~ 9 seconds
  and
  the colStats.mean() takes ~ 1 minute. However, I can't get it to run
  with
  the Kyro serializer -- I get the error:
 
  com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
  required: 8
 
  is there an easy/obvious fix?
 
 
  On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote:
 
  There is some serialization overhead. You can try
 
 
  https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
  . -Xiangrui
 
  On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
   I have an RDD of SparseVectors and I'd like to calculate the means
   returning
   a dense vector. I've tried doing this with the following (using
   pyspark,
   spark v1.2.0):
  
   def aggregate_partition_values(vec1, vec2) :
   vec1[vec2.indices] += vec2.values
   return vec1
  
   def aggregate_combined_vectors(vec1, vec2) :
   if all(vec1 == vec2) :
   # then the vector came from only one partition
   return vec1
   else:
   return vec1 + vec2
  
   means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
   aggregate_combined_vectors)
   means = means / nvals
  
   This turns out to be really slow -- and doesn't seem to depend on how
   many
   vectors there are so there seems to be some overhead somewhere that
   I'm
   not
   understanding. Is there a better way of doing this?
  
  
  
   --
   View this message in context:
  
   http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: including the spark-mllib in build.sbt

2015-01-12 Thread Xiangrui Meng
I don't know the root cause. Could you try including only

libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1

It should be sufficient because mllib depends on core.

-Xiangrui

On Mon, Jan 12, 2015 at 2:27 PM, Jianguo Li flyingfromch...@gmail.com wrote:
 Hi,

 I am trying to build my own scala project using sbt. The project is
 dependent on both spark-score and spark-mllib. I included the following two
 dependencies in my build.sbt file

 libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1
 libraryDependencies += org.apache.spark %% spark-core % 1.1.1

 However, when I run the package command in sbt, I got an error message
 indicating that object mllib is not a member of package org.apache.spark.

 Did I do anything wrong?

 Thanks,

 Jianguo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OOM exception during row deserialization

2015-01-12 Thread Pala M Muthaia
Does anybody have insight on this? Thanks.

On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia mchett...@rocketfuelinc.com
wrote:

 Hi,

 I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during
 a join step.

 Basically, i have a RDD of rows, that i am joining with another RDD of
 tuples.

 Some of the tasks succeed but a fair number failed with OOM exception with
 stack below. The stack belongs to the 'reducer' that is reading shuffle
 output from the 'mapper'.

 My question is what's the object being deserialized here - just a portion
 of an RDD or the whole RDD partition assigned to current reducer? The rows
 in the RDD could be large, but definitely not something that would run to
 100s of MBs in size, and thus run out of memory.

 Also, is there a way to determine size of the object being deserialized
 that results in the error (either by looking at some staging hdfs dir or
 logs)?

 java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
 exceeded}
 java.util.Arrays.copyOf(Arrays.java:2367)
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
 java.lang.StringBuilder.append(StringBuilder.java:204)
 java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
 java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
 java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
 java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 java.util.ArrayList.readObject(ArrayList.java:771)
 sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)



 Thanks,
 pala



Re: Broadcast joins on RDD

2015-01-12 Thread Reza Zadeh
First, you should collect().toMap() the small RDD, then you should use
broadcast followed by a map to do a map-side join
http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
(slide
10 has an example).

Spark SQL also does it by default for tables that are smaller than the
spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is
really small, but you can bump this up with set
spark.sql.autoBroadcastJoinThreshold=100 for example).

On Mon, Jan 12, 2015 at 3:15 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Hi,


 How do i do broadcast/map join on RDDs? I have a large RDD that i want to
 inner join with a small RDD. Instead of having the large RDD repartitioned
 and shuffled for join, i would rather send a copy of a small RDD to each
 task, and then perform the join locally.

 How would i specify this in Spark code? I didn't find much documentation
 online. I attempted to create a broadcast variable out of the small RDD and
 then access that in the join operator:

 largeRdd.join(smallRddBroadCastVar.value)

 but that didn't work as expected ( I found that all rows with same key
 were on same task)

 I am using Spark version 1.0.1


 Thanks,
 pala





Re: Manually trigger RDD map function without action

2015-01-12 Thread Sven Krasser
Hey Kevin,

I assume you want to trigger the map() for a side effect (since you don't
care about the result). To Cody's point, you can use foreach() *instead* of
map(). So instead of e.g. x.map(a = foo(a)).foreach(a = a), you'd run
x.foreach(a = foo(a)).

Best,
-Sven

On Mon, Jan 12, 2015 at 5:13 PM, Kevin Jung itsjb.j...@samsung.com wrote:

 Cody said If you don't care about the value that your map produced
 (because
 you're not already collecting or saving it), then is foreach more
 appropriate to what you're doing? but I can not see it from this thread.
 Anyway, I performed small benchmark to test what function is the most
 efficient way. And a winner is foreach(a = a) according to everyone's
 expectations. Collect can cause OOM from driver and count is very slower
 than the others. Thanks all.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
http://sites.google.com/site/krasser/?utm_source=sig


quickly counting the number of rows in a partition?

2015-01-12 Thread Kevin Burton
Is there a way to compute the total number of records in each RDD partition?

So say I had 4 partitions.. I’d want to have

partition 0: 100 records
partition 1: 104 records
partition 2: 90 records
partition 3: 140 records

Kevin

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts
http://spinn3r.com


Re: Is there any Spark implementation for Item-based Collaborative Filtering?

2015-01-12 Thread Simon Chan
Also a ready-to-use server with Spark MLlib:
http://docs.prediction.io/recommendation/quickstart/

The source code is here:
https://github.com/PredictionIO/PredictionIO/tree/develop/templates/scala-parallel-recommendation


Simon

On Sun, Nov 30, 2014 at 12:17 PM, Pat Ferrel p...@occamsmachete.com wrote:

 Actually the spark-itemsimilarity job and related code in the Spark module
 of Mahout creates all-pairs similarity too. It’s designed to use with a
 search engine, which provides the query part of the recommender. Integrate
 the two and you have a near realtime scalable item-based/cooccurrence
 collaborative filtering type recommender.


 On Nov 30, 2014, at 12:09 PM, Sean Owen so...@cloudera.com wrote:

 There is an implementation of all-pairs similarity. Have a look at the
 DIMSUM implementation in RowMatrix. It is an element of what you would
 need for such a recommender, but not the whole thing.

 You can also do the model-building part of an ALS-based recommender
 with ALS in MLlib.

 So, no not directly, but there are related pieces.

 On Sun, Nov 30, 2014 at 5:36 PM, shahab shahab.mok...@gmail.com wrote:
  Hi,
 
  I just wonder if there is any implementation for Item-based Collaborative
  Filtering in Spark?
 
  best,
  /Shahab

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [mllib] GradientDescent requires huge memory for storing weight vector

2015-01-12 Thread Reza Zadeh
I guess you're not using too many features (e.g.  10m), just that hashing
the index makes it look that way, is that correct?

If so, the simple dictionary that maps your feature index - rank can be
broadcast and used everywhere, so you can pass mllib just the feature's
rank as its index.

Reza

On Mon, Jan 12, 2015 at 4:26 PM, Tianshuo Deng td...@twitter.com.invalid
wrote:

 Hi,
 Currently in GradientDescent.scala, weights is constructed as a dense
 vector:

 initialWeights = Vectors.dense(new Array[Double](numFeatures))

 And the numFeatures is determined in the loadLibSVMFile as the max index
 of features.

 But in the case of using hash function to compute feature index, it
 results in a huge dense vector being generated taking lots of memory space.

 Any suggestions?




[mllib] GradientDescent requires huge memory for storing weight vector

2015-01-12 Thread Tianshuo Deng
Hi,
Currently in GradientDescent.scala, weights is constructed as a dense
vector:

initialWeights = Vectors.dense(new Array[Double](numFeatures))

And the numFeatures is determined in the loadLibSVMFile as the max index of
features.

But in the case of using hash function to compute feature index, it results
in a huge dense vector being generated taking lots of memory space.

Any suggestions?


Re: train many decision tress with a single spark job

2015-01-12 Thread Josh Buffum
Sean,

Thanks for the response. Is there some subtle difference between one model
partitioned by N users or N models per each 1 user? I think I'm missing
something with your question.

Looping through the RDD filtering one user at a time would certainly give
me the response that I am hoping for (i.e a map of user = decisiontree),
however, that seems like it would yield poor performance? The userIDs are
not integers, so I either need to iterator through some in-memory array of
them (could be quite large) or have some distributed lookup table. Neither
seem great.

I tried the random split thing. I wonder if I did something wrong there,
but some of the splits got RDDs with 0 tuples and some got RDDs with  1
tuple. I guess that's to be expected with some random distribution?
However, that won't work for me since it breaks the one tree per user
thing. I guess I could randomly distribute user IDs and then do the scan
everything and filter step...

How bad of an idea is it to do:

data.groupByKey.map( kvp = {
  val (key, data) = kvp
  val tree = DecisionTree.train( sc.makeRDD(data), ... )
  (key, tree)
})

Is there a way I could tell spark not to distribute the RDD created by
sc.makeRDD(data) but just to deal with it on whatever spark worker is
handling kvp? Does that question make sense?

Thanks!

Josh

On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen so...@cloudera.com wrote:

 You just mean you want to divide the data set into N subsets, and do
 that dividing by user, not make one model per user right?

 I suppose you could filter the source RDD N times, and build a model
 for each resulting subset. This can be parallelized on the driver. For
 example let's say you divide into N subsets depending on the value of
 the user ID modulo N:

 val N = ...
 (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N
 == d), ...))

 data should be cache()-ed here of course.

 However it may be faster and more principled to take random subsets
 directly:

 data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset =
 DecisionTree.train(subset, ...))

 On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote:
  I've got a data set of activity by user. For each user, I'd like to
 train a
  decision tree model. I currently have the feature creation step
 implemented
  in Spark and would naturally like to use mllib's decision tree model.
  However, it looks like the decision tree model expects the whole RDD and
  will train a single tree.
 
  Can I split the RDD by user (i.e. groupByKey) and then call the
  DecisionTree.trainClassifer in a reduce() or aggregate function to
 create a
  RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset
  instead of an RDD? Call sc.parallelize on the Iterable values in a
 groupBy
  to create a mini-RDD?
 
  Has anyone else tried something like this with success?
 
  Thanks!



Re: Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Cody said If you don't care about the value that your map produced (because
you're not already collecting or saving it), then is foreach more
appropriate to what you're doing? but I can not see it from this thread.
Anyway, I performed small benchmark to test what function is the most
efficient way. And a winner is foreach(a = a) according to everyone's
expectations. Collect can cause OOM from driver and count is very slower
than the others. Thanks all.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: quickly counting the number of rows in a partition?

2015-01-12 Thread Sven Krasser
Yes, using mapPartitionsWithIndex, e.g. in PySpark:

 sc.parallelize(xrange(0,1000), 4).mapPartitionsWithIndex(lambda
idx,iter: ((idx, len(list(iter))),)).collect()
[(0, 250), (1, 250), (2, 250), (3, 250)]

(This is not the most efficient way to get the length of an iterator, but
you get the idea...)

Best,
-Sven

On Mon, Jan 12, 2015 at 6:54 PM, Kevin Burton bur...@spinn3r.com wrote:

 Is there a way to compute the total number of records in each RDD
 partition?

 So say I had 4 partitions.. I’d want to have

 partition 0: 100 records
 partition 1: 104 records
 partition 2: 90 records
 partition 3: 140 records

 Kevin

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: train many decision tress with a single spark job

2015-01-12 Thread Josh Buffum
You are right... my code example doesn't work :)

I actually do want a decision tree per user. So, for 1 million users, I
want 1 million trees. We're training against time series data, so there are
still quite a few data points per users. My previous message where I
mentioned RDDs with no length was, I think, a result of the way the random
partitioning worked (I was partitioning into N groups where N was the
number of users... total).

Given this, I'm thinking the mlllib is not designed for this particular
case? It appears optimized for training across large datasets. I was just
hoping to leverage it since creating my feature sets for the users was
already in Spark.


On Mon, Jan 12, 2015 at 5:05 PM, Sean Owen so...@cloudera.com wrote:

 A model partitioned by users?

 I mean that if you have a million users surely you don't mean to build a
 million models. There would be little data per user right? Sounds like you
 have 0 sometimes.

 You would typically be generalizing across users not examining them in
 isolation. Models are built on thousands or millions of data points.

 I assumed you were subsetting for cross validation in which case we are
 talking about making more like say 10 models. You usually take random
 subsets. But it might be as fine to subset as a function of a user ID if
 you like. Or maybe you do have some reason for segregating users and
 modeling them differently (e.g. different geographies or something).

 Your code doesn't work as is since you are using RDDs inside RDDs. But I
 am also not sure you should do what it looks like you are trying to do.
 On Jan 13, 2015 12:32 AM, Josh Buffum jbuf...@gmail.com wrote:

 Sean,

 Thanks for the response. Is there some subtle difference between one
 model partitioned by N users or N models per each 1 user? I think I'm
 missing something with your question.

 Looping through the RDD filtering one user at a time would certainly give
 me the response that I am hoping for (i.e a map of user = decisiontree),
 however, that seems like it would yield poor performance? The userIDs are
 not integers, so I either need to iterator through some in-memory array of
 them (could be quite large) or have some distributed lookup table. Neither
 seem great.

 I tried the random split thing. I wonder if I did something wrong there,
 but some of the splits got RDDs with 0 tuples and some got RDDs with  1
 tuple. I guess that's to be expected with some random distribution?
 However, that won't work for me since it breaks the one tree per user
 thing. I guess I could randomly distribute user IDs and then do the scan
 everything and filter step...

 How bad of an idea is it to do:

 data.groupByKey.map( kvp = {
   val (key, data) = kvp
   val tree = DecisionTree.train( sc.makeRDD(data), ... )
   (key, tree)
 })

 Is there a way I could tell spark not to distribute the RDD created by
 sc.makeRDD(data) but just to deal with it on whatever spark worker is
 handling kvp? Does that question make sense?

 Thanks!

 Josh

 On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen so...@cloudera.com wrote:

 You just mean you want to divide the data set into N subsets, and do
 that dividing by user, not make one model per user right?

 I suppose you could filter the source RDD N times, and build a model
 for each resulting subset. This can be parallelized on the driver. For
 example let's say you divide into N subsets depending on the value of
 the user ID modulo N:

 val N = ...
 (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N
 == d), ...))

 data should be cache()-ed here of course.

 However it may be faster and more principled to take random subsets
 directly:

 data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset =
 DecisionTree.train(subset, ...))

 On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote:
  I've got a data set of activity by user. For each user, I'd like to
 train a
  decision tree model. I currently have the feature creation step
 implemented
  in Spark and would naturally like to use mllib's decision tree model.
  However, it looks like the decision tree model expects the whole RDD
 and
  will train a single tree.
 
  Can I split the RDD by user (i.e. groupByKey) and then call the
  DecisionTree.trainClassifer in a reduce() or aggregate function to
 create a
  RDD[DecisionTreeModels]? Maybe train the model with an in-memory
 dataset
  instead of an RDD? Call sc.parallelize on the Iterable values in a
 groupBy
  to create a mini-RDD?
 
  Has anyone else tried something like this with success?
 
  Thanks!





Re: OOM exception during row deserialization

2015-01-12 Thread Sven Krasser
Hey Pala,

I also find it very hard to get to the bottom of memory issues such as this
one based on what's in the logs (so if you come up with some findings, then
please share here). In the interim, here are a few things you can try:

   - Provision more memory per executor. While in theory (and depending on
   your storage level) data can be spilled to disk or recomputed from lineage
   if it doesn't fit into memory, I have experienced a lot of problems with
   failing jobs when underprovisioning memory.
   - Experiment with both the memory and shuffle fractions.
   - Repartition your data so that you get smaller tasks.

As far as object size goes, since your issue occurs on deserialization, you
could compute the size on the map side and roll it up into a histogram.

Hope this helps!

-Sven



On Mon, Jan 12, 2015 at 2:48 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Does anybody have insight on this? Thanks.

 On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during
 a join step.

 Basically, i have a RDD of rows, that i am joining with another RDD of
 tuples.

 Some of the tasks succeed but a fair number failed with OOM exception
 with stack below. The stack belongs to the 'reducer' that is reading
 shuffle output from the 'mapper'.

 My question is what's the object being deserialized here - just a portion
 of an RDD or the whole RDD partition assigned to current reducer? The rows
 in the RDD could be large, but definitely not something that would run to
 100s of MBs in size, and thus run out of memory.

 Also, is there a way to determine size of the object being deserialized
 that results in the error (either by looking at some staging hdfs dir or
 logs)?

 java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
 exceeded}
 java.util.Arrays.copyOf(Arrays.java:2367)
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
 java.lang.StringBuilder.append(StringBuilder.java:204)
 java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
 java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
 java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
 java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 java.util.ArrayList.readObject(ArrayList.java:771)
 sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)



 Thanks,
 pala





-- 
http://sites.google.com/site/krasser/?utm_source=sig


Broadcast joins on RDD

2015-01-12 Thread Pala M Muthaia
Hi,


How do i do broadcast/map join on RDDs? I have a large RDD that i want to
inner join with a small RDD. Instead of having the large RDD repartitioned
and shuffled for join, i would rather send a copy of a small RDD to each
task, and then perform the join locally.

How would i specify this in Spark code? I didn't find much documentation
online. I attempted to create a broadcast variable out of the small RDD and
then access that in the join operator:

largeRdd.join(smallRddBroadCastVar.value)

but that didn't work as expected ( I found that all rows with same key were
on same task)

I am using Spark version 1.0.1


Thanks,
pala


Re: train many decision tress with a single spark job

2015-01-12 Thread Sean Owen
A model partitioned by users?

I mean that if you have a million users surely you don't mean to build a
million models. There would be little data per user right? Sounds like you
have 0 sometimes.

You would typically be generalizing across users not examining them in
isolation. Models are built on thousands or millions of data points.

I assumed you were subsetting for cross validation in which case we are
talking about making more like say 10 models. You usually take random
subsets. But it might be as fine to subset as a function of a user ID if
you like. Or maybe you do have some reason for segregating users and
modeling them differently (e.g. different geographies or something).

Your code doesn't work as is since you are using RDDs inside RDDs. But I am
also not sure you should do what it looks like you are trying to do.
On Jan 13, 2015 12:32 AM, Josh Buffum jbuf...@gmail.com wrote:

 Sean,

 Thanks for the response. Is there some subtle difference between one model
 partitioned by N users or N models per each 1 user? I think I'm missing
 something with your question.

 Looping through the RDD filtering one user at a time would certainly give
 me the response that I am hoping for (i.e a map of user = decisiontree),
 however, that seems like it would yield poor performance? The userIDs are
 not integers, so I either need to iterator through some in-memory array of
 them (could be quite large) or have some distributed lookup table. Neither
 seem great.

 I tried the random split thing. I wonder if I did something wrong there,
 but some of the splits got RDDs with 0 tuples and some got RDDs with  1
 tuple. I guess that's to be expected with some random distribution?
 However, that won't work for me since it breaks the one tree per user
 thing. I guess I could randomly distribute user IDs and then do the scan
 everything and filter step...

 How bad of an idea is it to do:

 data.groupByKey.map( kvp = {
   val (key, data) = kvp
   val tree = DecisionTree.train( sc.makeRDD(data), ... )
   (key, tree)
 })

 Is there a way I could tell spark not to distribute the RDD created by
 sc.makeRDD(data) but just to deal with it on whatever spark worker is
 handling kvp? Does that question make sense?

 Thanks!

 Josh

 On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen so...@cloudera.com wrote:

 You just mean you want to divide the data set into N subsets, and do
 that dividing by user, not make one model per user right?

 I suppose you could filter the source RDD N times, and build a model
 for each resulting subset. This can be parallelized on the driver. For
 example let's say you divide into N subsets depending on the value of
 the user ID modulo N:

 val N = ...
 (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N
 == d), ...))

 data should be cache()-ed here of course.

 However it may be faster and more principled to take random subsets
 directly:

 data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset =
 DecisionTree.train(subset, ...))

 On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote:
  I've got a data set of activity by user. For each user, I'd like to
 train a
  decision tree model. I currently have the feature creation step
 implemented
  in Spark and would naturally like to use mllib's decision tree model.
  However, it looks like the decision tree model expects the whole RDD and
  will train a single tree.
 
  Can I split the RDD by user (i.e. groupByKey) and then call the
  DecisionTree.trainClassifer in a reduce() or aggregate function to
 create a
  RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset
  instead of an RDD? Call sc.parallelize on the Iterable values in a
 groupBy
  to create a mini-RDD?
 
  Has anyone else tried something like this with success?
 
  Thanks!





Re: Trouble with large Yarn job

2015-01-12 Thread Sven Krasser
Anders,

This could be related to this open ticket:
https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also
fixed that for us as a stopgap.

Best,
-Sven


On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg arp...@spotify.com wrote:

 Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
 actually been able to solve the problem finally, and it seems to be an
 issue with too many partitions. The repartitioning I tried initially did so
 after the union, and then it's too late. By repartitioning as early as
 possible, and significantly reducing number of partitions (going from
 100,000+ to ~6,000 partitions), the job succeeds and no more Error
 communicating with MapOutputTracker issues. Seems like an issue with
 handling too many partitions and executors as the same time.

 Would be awesome with an auto-repartition function, that checks sizes of
 existing partitions and compares with the HDFS block size. If too small (or
 too large), it would repartition to partition sizes similar to the block
 size...

 Hope this help others with similar issues.

 Best,
 Anders

 On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 Have you checked your NodeManager logs to make sure YARN isn't killing
 executors for exceeding memory limits?

 -Sandy

 On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote:

 Hey,

 I have a job that keeps failing if too much data is processed, and I
 can't see how to get it working. I've tried repartitioning with more
 partitions and increasing amount of memory for the executors (now about 12G
 and 400 executors. Here is a snippets of the first part of the code, which
 succeeds without any problems:

 val all_days = sc.union(
   ds.dateInterval(startDate, date).map(date =
 sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
   .map(s = (
 (s.getUsername, s.getTrackUri),
 UserItemData(s.getUsername, s.getTrackUri,
   build_vector1(date, s),
   build_vector2(s
   )
 )
   .reduceByKey(sum_vectors)

 I want to process 30 days of data or more, but am only able to process
 about 10 days. If having more days of data (lower startDate in code
 above), the union above succeeds but the code below fails with Error
 communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL
 for more detailed error messages). Here is a snippet of the code that fails:

 val top_tracks = all_days.map(t = (t._1._2.toString, 1)).
 reduceByKey(_+_)
   .filter(trackFilter)
   .repartition(4)
   .persist(StorageLevel.MEMORY_AND_DISK_SER)

 val observation_data = all_days
   .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
   .join(top_tracks)

 The calculation of top_tracks works, but the last mapPartitions task
 fails with given error message if given more than 10 days of data. Also
 tried increasing the spark.akka.askTimeout setting, but it still fails
 even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
 and the kryo serialization.

 Realize that this is a rather long message, but I'm stuck and would
 appreciate any help or clues for resolving this issue. Seems to be a
 out-of-memory issue, but it does not seems to help to increase the number
 of partitions.

 Thanks,
 Anders






-- 
http://sites.google.com/site/krasser/?utm_source=sig


Running Spark application from command line

2015-01-12 Thread Arun Lists
I have a Spark application that was assembled using sbt 0.13.7, Scala 2.11,
and Spark 1.2.0. In build.sbt, I am running on Mac OSX Yosemite.

I use provided for the Spark dependencies. I can run the application fine
within sbt.

I run into problems when I try to run it from the command line. Here is the
command I use:

ADD_JARS=analysis/target/scala-2.11/dtex-analysis_2.11-0.1.jar scala -cp
/Applications/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:analysis/target/scala-2.11/dtex-analysis_2.11-0.1.jar
com.dtex.analysis.transform.GenUserSummaryView ...

I get the following error messages below. Please advise what I can do to
resolve this issue. Thanks!

arun
15/01/12 22:47:18 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

15/01/12 22:47:18 WARN BlockManager: Putting block broadcast_0 failed

java.lang.NoSuchMethodError:
scala.collection.immutable.$colon$colon.hd$1()Ljava/lang/Object;

at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:84)

at
org.apache.spark.util.collection.SizeTracker$class.resetSamples(SizeTracker.scala:61)

at
org.apache.spark.util.collection.SizeTrackingVector.resetSamples(SizeTrackingVector.scala:25)

at
org.apache.spark.util.collection.SizeTracker$class.$init$(SizeTracker.scala:51)

at
org.apache.spark.util.collection.SizeTrackingVector.init(SizeTrackingVector.scala:25)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)

at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)

at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)

at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)

at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)

at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)

at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)

at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695)

at org.apache.spark.SparkContext.textFile(SparkContext.scala:540)

at
com.dtex.analysis.transform.TransformUtils$anonfun$2.apply(TransformUtils.scala:97)

at
com.dtex.analysis.transform.TransformUtils$anonfun$2.apply(TransformUtils.scala:97)

at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:245)

at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:245)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)

at
com.dtex.analysis.transform.TransformUtils$.generateUserSummaryData(TransformUtils.scala:97)

at
com.dtex.analysis.transform.GenUserSummaryView$.main(GenUserSummaryView.scala:77)

at
com.dtex.analysis.transform.GenUserSummaryView.main(GenUserSummaryView.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at
scala.reflect.internal.util.ScalaClassLoader$anonfun$run$1.apply(ScalaClassLoader.scala:70)

at
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)

at
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:101)

at
scala.reflect.internal.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:70)

at
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:101)

at scala.tools.nsc.CommonRunner$class.run(ObjectRunner.scala:22)

at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:39)

at scala.tools.nsc.CommonRunner$class.runAndCatch(ObjectRunner.scala:29)

at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:39)

at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:65)

at scala.tools.nsc.MainGenericRunner.run$1(MainGenericRunner.scala:87)

at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:98)

at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:103)

at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)


Re: Problem with building spark-1.2.0

2015-01-12 Thread Rapelly Kartheek
Yes, this proxy problem is resolved.


*how your build refers tohttps://github.com/ScrapCodes/sbt-pom-reader.git
https://github.com/ScrapCodes/sbt-pom-reader.git  I don't see thisrepo
the project code base.*
I manually downloaded the sbt-pom-reader directory and moved into
.sbt/0.13/staging/*/ directory. But, I face the following:

karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION = 2.3.0 sbt/sbt assembly
Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from
/home/karthik/spark-1.2.0/project/project
[info] Loading project definition from
/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader/project
[warn] Multiple resolvers having different access mechanism configured with
same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate
project resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
[info] Updating
{file:/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader/project/}sbt-pom-reader-build...
[info] Resolving com.typesafe.sbt#sbt-ghpages;0.5.2 ...

Could you please tell me how do I build stand-alone spark-1.2.0 with sbt
correctly?

On Mon, Jan 12, 2015 at 4:21 PM, Sean Owen so...@cloudera.com wrote:

 The problem is there in the logs. When it went to clone some code,
 something went wrong with the proxy:

 Received HTTP code 407 from proxy after CONNECT

 Probably you have an HTTP proxy and you have not authenticated. It's
 specific to your environment.

 Although it's unrelated, I'm curious how your build refers to
 https://github.com/ScrapCodes/sbt-pom-reader.git  I don't see this
 repo the project code base.

 On Mon, Jan 12, 2015 at 9:09 AM, Kartheek.R kartheek.m...@gmail.com
 wrote:
  Hi,
  This is what I am trying to do:
 
  karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean
  Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
  Note, this will be overridden by -java-home if it is set.
  [info] Loading project definition from
  /home/karthik/spark-1.2.0/project/project
  Cloning into
  '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'...
  fatal: unable to access '
 https://github.com/ScrapCodes/sbt-pom-reader.git/':
  Received HTTP code 407 from proxy after CONNECT
  java.lang.RuntimeException: Nonzero exit code (128): git clone
  https://github.com/ScrapCodes/sbt-pom-reader.git



Re: How to create an empty RDD with a given type?

2015-01-12 Thread Xuelin Cao
Got it, thanks!

On Tue, Jan 13, 2015 at 2:00 PM, Justin Yip yipjus...@gmail.com wrote:

 Xuelin,

 There is a function called emtpyRDD under spark context
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
  which
 serves this purpose.

 Justin

 On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao xuelincao2...@gmail.com
 wrote:



 Hi,

 I'd like to create a transform function, that convert RDD[String] to
 RDD[Int]

 Occasionally, the input RDD could be an empty RDD. I just want to
 directly create an empty RDD[Int] if the input RDD is empty. And, I don't
 want to return None as the result.

 Is there an easy way to do that?






Re: How to create an empty RDD with a given type?

2015-01-12 Thread Justin Yip
Xuelin,

There is a function called emtpyRDD under spark context
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
which
serves this purpose.

Justin

On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao xuelincao2...@gmail.com wrote:



 Hi,

 I'd like to create a transform function, that convert RDD[String] to
 RDD[Int]

 Occasionally, the input RDD could be an empty RDD. I just want to
 directly create an empty RDD[Int] if the input RDD is empty. And, I don't
 want to return None as the result.

 Is there an easy way to do that?





Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-12 Thread lihu
How about your scene? do you need use lots of Broadcast? If not, It will be
better to focus more on other thing.

At this time, there is not more better method than TorrentBroadcast. Though
one-by-one, but after one node get the data, it can act as the data source
immediately.


Creating RDD from only few columns of a Parquet file

2015-01-12 Thread Ajay Srivastava
Hi,I am trying to read a parquet file using -val parquetFile = 
sqlContext.parquetFile(people.parquet)

There is no way to specify that I am interested in reading only some columns 
from disk. For example, If the parquet file has 10 columns and want to read 
only 3 columns from disk.

We have done an experiment -
Table1 - Parquet file containing 10 columns
Table2 - Parquet file containing only 3 columns which were used in query 

The time taken by query on table1 and table2 shows huge difference. Query on 
Table1 takes more than double of time taken on table2 which makes me think that 
spark is reading all the columns from disk in case of table1 when it needs only 
3 columns.

How should I make sure that it reads only 3 of 10 columns from disk ?


Regards,
Ajay


configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread David McWhorter

Hi all,

I'm trying to figure out how to set this option:  
spark.yarn.driver.memoryOverhead on Spark 1.2.0.  I found this helpful 
overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, 
which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 
added to spark-submit. However, when I do that I get this error:

Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling 
sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark 
configuration object but I still get Will allocate AM container, with 
 MB memory including 384 MB overhead when launching.  I'm running 
in yarn-cluster mode.


Any help or tips would be appreciated.

Thanks,
David

--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com | 434.299.0090x204



Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread Ganelin, Ilya
There are two related options:

To solve your problem directly try:

val conf = new SparkConf().set(spark.yarn.driver.memoryOverhead, 1024)
val sc = new SparkContext(conf)

And the second, which increases the overall memory available on the driver, as 
part of your spark-submit script add:

--driver-memory 2g


Hope this helps!


From: David McWhorter mcwhor...@ccri.commailto:mcwhor...@ccri.com
Date: Monday, January 12, 2015 at 11:01 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

Hi all,

I'm trying to figure out how to set this option:  
spark.yarn.driver.memoryOverhead on Spark 1.2.0.  I found this helpful 
overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476,
 which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to 
spark-submit.  However, when I do that I get this error:
Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 
1024) on my spark configuration object but I still get Will allocate AM 
container, with  MB memory including 384 MB overhead when launching.  I'm 
running in yarn-cluster mode.

Any help or tips would be appreciated.

Thanks,
David

--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.commailto:mcwhor...@ccri.com | 434.299.0090x204



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Failed to save RDD as text file to local file system

2015-01-12 Thread NingjunWang
Prannoy

I tried this r.saveAsTextFile(home/cloudera/tmp/out1), it return without 
error. But where does it saved to? The folder “/home/cloudera/tmp/out1” is not 
cretaed.

I also tried the following
cd /home/cloudera/tmp/
spark-shell
scala val r = sc.parallelize(Array(a, b, c))
scala r.saveAsTextFile(out1)

It does not return error. But still there is no “out1” folder created under 
/home/cloudera/tmp/

I tried to give absolute path but then get an error

scala r.saveAsTextFile(/home/cloudera/tmp/out1)
org.apache.hadoop.security.AccessControlException: Permission denied: 
user=cloudera, access=WRITE, inode=/:hdfs:supergroup:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6286)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6268)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6220)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4087)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

Very frustrated. Please advise.


Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: Prannoy [via Apache Spark User List] 
[mailto:ml-node+s1001560n21093...@n3.nabble.com]
Sent: Monday, January 12, 2015 4:18 AM
To: Wang, Ningjun (LNG-NPV)
Subject: Re: Failed to save RDD as text file to local file system

Have you tried simple giving the path where you want to save the file ?

For instance in your case just do

r.saveAsTextFile(home/cloudera/tmp/out1)

Dont use file

This will create a folder with name out1. saveAsTextFile always write by making 
a directory, it does not write data into a single file.

Incase you need a single file you can use copyMerge API in FileUtils.

FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 , 
true, conf,null);
Now out2 will be a single file containing your data.
fs is the configuration of you local file system.
Thanks


On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] 
[hidden email]/user/SendEmail.jtp?type=nodenode=21093i=0 wrote:
No, do you have any idea?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: firemonk9 [via Apache Spark User List] [mailto:[hidden 
email]/user/SendEmail.jtp?type=nodenode=21093i=1[hidden 
email]http://user/SendEmail.jtp?type=nodenode=21068i=0]
Sent: Friday, January 09, 2015 2:56 PM
To: Wang, Ningjun (LNG-NPV)
Subject: Re: Failed to save RDD as text file to local file system

Have you found any resolution for this issue ?

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html
To unsubscribe from Failed to save RDD as text file to local file system, click 
here.

can I buffer flatMap input at each worker node?

2015-01-12 Thread maherrt
Dear All

what i want to do is :
as the data is partitioned on many worker nodes I want to be able to process
this partition of data as a whole on each partition and then produce my
output using flatMap for example.
so can I loads all of the input records on one worker node and emitting any
output using map function?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-I-buffer-flatMap-input-at-each-worker-node-tp21106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Trouble with large Yarn job

2015-01-12 Thread Anders Arpteg
Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
actually been able to solve the problem finally, and it seems to be an
issue with too many partitions. The repartitioning I tried initially did so
after the union, and then it's too late. By repartitioning as early as
possible, and significantly reducing number of partitions (going from
100,000+ to ~6,000 partitions), the job succeeds and no more Error
communicating with MapOutputTracker issues. Seems like an issue with
handling too many partitions and executors as the same time.

Would be awesome with an auto-repartition function, that checks sizes of
existing partitions and compares with the HDFS block size. If too small (or
too large), it would repartition to partition sizes similar to the block
size...

Hope this help others with similar issues.

Best,
Anders

On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Anders,

 Have you checked your NodeManager logs to make sure YARN isn't killing
 executors for exceeding memory limits?

 -Sandy

 On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote:

 Hey,

 I have a job that keeps failing if too much data is processed, and I
 can't see how to get it working. I've tried repartitioning with more
 partitions and increasing amount of memory for the executors (now about 12G
 and 400 executors. Here is a snippets of the first part of the code, which
 succeeds without any problems:

 val all_days = sc.union(
   ds.dateInterval(startDate, date).map(date =
 sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
   .map(s = (
 (s.getUsername, s.getTrackUri),
 UserItemData(s.getUsername, s.getTrackUri,
   build_vector1(date, s),
   build_vector2(s
   )
 )
   .reduceByKey(sum_vectors)

 I want to process 30 days of data or more, but am only able to process
 about 10 days. If having more days of data (lower startDate in code
 above), the union above succeeds but the code below fails with Error
 communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL
 for more detailed error messages). Here is a snippet of the code that fails:

 val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey
 (_+_)
   .filter(trackFilter)
   .repartition(4)
   .persist(StorageLevel.MEMORY_AND_DISK_SER)

 val observation_data = all_days
   .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
   .join(top_tracks)

 The calculation of top_tracks works, but the last mapPartitions task
 fails with given error message if given more than 10 days of data. Also
 tried increasing the spark.akka.askTimeout setting, but it still fails
 even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
 and the kryo serialization.

 Realize that this is a rather long message, but I'm stuck and would
 appreciate any help or clues for resolving this issue. Seems to be a
 out-of-memory issue, but it does not seems to help to increase the number
 of partitions.

 Thanks,
 Anders





Re: can I buffer flatMap input at each worker node?

2015-01-12 Thread Sven Krasser
Not sure I understand correctly, but it sounds like you're looking for
mapPartitions().
-Sven

On Mon, Jan 12, 2015 at 10:17 AM, maherrt mahe...@hotmail.com wrote:

 Dear All

 what i want to do is :
 as the data is partitioned on many worker nodes I want to be able to
 process
 this partition of data as a whole on each partition and then produce my
 output using flatMap for example.
 so can I loads all of the input records on one worker node and emitting any
 output using map function?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/can-I-buffer-flatMap-input-at-each-worker-node-tp21106.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
http://sites.google.com/site/krasser/?utm_source=sig


Pattern Matching / Equals on Case Classes in Spark Not Working

2015-01-12 Thread Rosner, Frank (Allianz SE)
Dear Spark Users,

I googled the web for several hours now but I don't find a solution for my 
problem. So maybe someone from this list can help.

I have an RDD of case classes, generated from CSV files with Spark. When I used 
the distinct operator, there were still duplicates. So I investigated and found 
out that the equals returns false although the two objects were equal (so were 
their individual fields as well as toStrings).

After googling it I found that the case class equals might break in case the 
two objects are created by different class loaders. So I implemented my own 
equals method using mattern matching (code example below). It still didn't 
work. Some debugging revealed that the problem lies in the pattern matching. 
Depending on the objects I compare (and maybe the split / classloader they are 
generated in?) the patternmatching works /doesn't:

case class Customer(id: String, age: Option[Int], entryDate: 
Option[java.util.Date]) {
  def equals(that: Any): Boolean = that match {
case Customer(id, age, entryDate) = {
  println(Pattern matching worked!)
  this.id == id  this.age == age  this.entryDate == entryDate
}
case _ = false
  }
}

//val x: Array[Customer]
// ... some spark code to filter original data and collect x

scala x(0)
Customer(a, Some(5), Some(Fri Sep 23 00:00:00 CEST 1994))
scala x(1)
Customer(a, None, None)
scala x(2)
Customer(a, None, None)
scala x(3)
Customer(a, None, None)

scala x(0) == x(0) // should be true and works
Pattern matching works!
res0: Boolean = true
scala x(0) == x(1) // should be false and works
Pattern matching works!
res1: Boolean = false
scala x(1) == x(2) // should be true, does not work
res2: Boolean = false
scala x(2) == x(3) // should be true, does not work
Pattern matching works!
res3: Boolean = true
scala x(0) == x(3) // should be false, does not work
res4: Boolean = false

Why is the pattern matching not working? It seems that there are two kinds of 
Customers: 0,1 and 2,3 which don't match somehow. Is this related to some 
classloaders? Is there a way around this other than using instanceof and 
defining a custom equals operation for every case class I write?

Thanks for the help!
Frank


Re: Issue writing to Cassandra from Spark

2015-01-12 Thread Ankur Srivastava
Hi Akhil,

Thank you for the pointers. Below is how we are saving data to Cassandra.

javaFunctions(rddToSave).writerBuilder(datapipelineKeyspace,

  datapipelineOutputTable, mapToRow(Sample.class))

The data we are saving at this stage is ~200 million rows.

How do we control application threads in spark so that it does not exceed 
rpc_max_threads? We are running with default value of this property in
cassandra.yaml. I have already set these
two properties for Spark-Cassandra connector:

spark.cassandra.output.batch.size.rows=1
spark.cassandra.output.concurrent.writes=1

Thanks
- Ankur


On Sun, Jan 11, 2015 at 10:16 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I see, can you paste the piece of code? Its probably because you are
 exceeding the number of connection that are specified in the
 property rpc_max_threads. Make sure you close all the connections properly.

 Thanks
 Best Regards

 On Mon, Jan 12, 2015 at 7:45 AM, Ankur Srivastava 
 ankur.srivast...@gmail.com wrote:

 Hi Akhil, thank you for your response.

 Actually we are first reading from cassandra and then writing back after
 doing some processing. All the reader stages succeed with no error and many
 writer stages also succeed but many fail as well.

 Thanks
 Ankur

 On Sat, Jan 10, 2015 at 10:15 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Just make sure you are not connecting to the Old RPC Port (9160), new
 binary port is running on 9042.

 What is your rpc_address listed in cassandra.yaml? Also make sure you
 have start_native_transport: *true *in the yaml file.

 Thanks
 Best Regards

 On Sat, Jan 10, 2015 at 8:44 AM, Ankur Srivastava 
 ankur.srivast...@gmail.com wrote:

 Hi,

 We are currently using spark to join data in Cassandra and then write
 the results back into Cassandra. While reads happen with out any error
 during the writes we see many exceptions like below. Our environment
 details are:

 - Spark v 1.1.0
 - spark-cassandra-connector-java_2.10 v 1.1.0

 We are using below settings for the writer

 spark.cassandra.output.batch.size.rows=1

 spark.cassandra.output.concurrent.writes=1

 com.datastax.driver.core.exceptions.NoHostAvailableException: All
 host(s) tried for query failed (tried: [] - use getErrors() for details)

 at
 com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108)

 at
 com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 Thanks

 Ankur







Re: Pattern Matching / Equals on Case Classes in Spark Not Working

2015-01-12 Thread Matei Zaharia
Is this in the Spark shell? Case classes don't work correctly in the Spark 
shell unfortunately (though they do work in the Scala shell) because we change 
the way lines of code compile to allow shipping functions across the network. 
The best way to get case classes in there is to compile them into a JAR and 
then add that to your spark-shell's classpath with --jars.

Matei

 On Jan 12, 2015, at 10:04 AM, Rosner, Frank (Allianz SE) 
 frank.ros...@allianz.com wrote:
 
 Dear Spark Users,
  
 I googled the web for several hours now but I don't find a solution for my 
 problem. So maybe someone from this list can help.
  
 I have an RDD of case classes, generated from CSV files with Spark. When I 
 used the distinct operator, there were still duplicates. So I investigated 
 and found out that the equals returns false although the two objects were 
 equal (so were their individual fields as well as toStrings).
  
 After googling it I found that the case class equals might break in case the 
 two objects are created by different class loaders. So I implemented my own 
 equals method using mattern matching (code example below). It still didn't 
 work. Some debugging revealed that the problem lies in the pattern matching. 
 Depending on the objects I compare (and maybe the split / classloader they 
 are generated in?) the patternmatching works /doesn't:
  
 case class Customer(id: String, age: Option[Int], entryDate: 
 Option[java.util.Date]) {
   def equals(that: Any): Boolean = that match {
 case Customer(id, age, entryDate) = {
   println(Pattern matching worked!)
   this.id == id  this.age == age  this.entryDate == entryDate
 }
 case _ = false
   }
 }
  
 //val x: Array[Customer]
 // ... some spark code to filter original data and collect x
  
 scala x(0)
 Customer(a, Some(5), Some(Fri Sep 23 00:00:00 CEST 1994))
 scala x(1)
 Customer(a, None, None)
 scala x(2)
 Customer(a, None, None)
 scala x(3)
 Customer(a, None, None)
  
 scala x(0) == x(0) // should be true and works
 Pattern matching works!
 res0: Boolean = true
 scala x(0) == x(1) // should be false and works
 Pattern matching works!
 res1: Boolean = false
 scala x(1) == x(2) // should be true, does not work
 res2: Boolean = false
 scala x(2) == x(3) // should be true, does not work
 Pattern matching works!
 res3: Boolean = true
 scala x(0) == x(3) // should be false, does not work
 res4: Boolean = false
  
 Why is the pattern matching not working? It seems that there are two kinds of 
 Customers: 0,1 and 2,3 which don't match somehow. Is this related to some 
 classloaders? Is there a way around this other than using instanceof and 
 defining a custom equals operation for every case class I write?
  
 Thanks for the help!
 Frank