Re: How to set UI port #?
Set the port using spconf.set(spark.ui.port,); where, is any port spconf is your spark configuration object. On Sun, Jan 11, 2015 at 2:08 PM, YaoPau [via Apache Spark User List] ml-node+s1001560n21083...@n3.nabble.com wrote: I have multiple Spark Streaming jobs running all day, and so when I run my hourly batch job, I always get a java.net.BindException: Address already in use which starts at 4040 then goes to 4041, 4042, 4043 before settling at 4044. That slows down my hourly job, and time is critical. Is there a way I can set it to 4044 by default, or prevent the UI from launching altogether? Jon -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083p21090.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Problem with building spark-1.2.0
Hi, This is what I am trying to do: karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. [info] Loading project definition from /home/karthik/spark-1.2.0/project/project Cloning into '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'... fatal: unable to access 'https://github.com/ScrapCodes/sbt-pom-reader.git/': Received HTTP code 407 from proxy after CONNECT java.lang.RuntimeException: Nonzero exit code (128): git clone https://github.com/ScrapCodes/sbt-pom-reader.git /home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader at scala.sys.package$.error(package.scala:27) at sbt.Resolvers$.run(Resolvers.scala:127) at sbt.Resolvers$.run(Resolvers.scala:117) at sbt.Resolvers$$anon$2.clone(Resolvers.scala:74) at sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11$$anonfun$apply$5.apply$mcV$sp(Resolvers.scala:99) at sbt.Resolvers$.creates(Resolvers.scala:134) at sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11.apply(Resolvers.scala:98) at sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11.apply(Resolvers.scala:97) at sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$3.apply(BuildLoader.scala:88) at sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$3.apply(BuildLoader.scala:87) at scala.Option.map(Option.scala:145) at sbt.BuildLoader$$anonfun$componentLoader$1.apply(BuildLoader.scala:87) at sbt.BuildLoader$$anonfun$componentLoader$1.apply(BuildLoader.scala:83) at sbt.MultiHandler.apply(BuildLoader.scala:15) at sbt.BuildLoader.apply(BuildLoader.scala:139) at sbt.Load$.loadAll(Load.scala:334) at sbt.Load$.loadURI(Load.scala:289) at sbt.Load$.load(Load.scala:285) at sbt.Load$.load(Load.scala:276) at sbt.Load$.apply(Load.scala:130) at sbt.Load$.buildPluginDefinition(Load.scala:819) at sbt.Load$.buildPlugins(Load.scala:785) at sbt.Load$.plugins(Load.scala:773) at sbt.Load$.loadUnit(Load.scala:431) at sbt.Load$$anonfun$18$$anonfun$apply$11.apply(Load.scala:281) at sbt.Load$$anonfun$18$$anonfun$apply$11.apply(Load.scala:281) at sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader.scala:91) at sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader.scala:90) at sbt.BuildLoader.apply(BuildLoader.scala:140) at sbt.Load$.loadAll(Load.scala:334) at sbt.Load$.loadURI(Load.scala:289) at sbt.Load$.load(Load.scala:285) at sbt.Load$.load(Load.scala:276) at sbt.Load$.apply(Load.scala:130) at sbt.Load$.defaultLoad(Load.scala:36) at sbt.BuiltinCommands$.doLoadProject(Main.scala:481) at sbt.BuiltinCommands$$anonfun$loadProjectImpl$2.apply(Main.scala:475) at sbt.BuiltinCommands$$anonfun$loadProjectImpl$2.apply(Main.scala:475) at sbt.Command$$anonfun$applyEffect$1$$anonfun$apply$2.apply(Command.scala:58) at sbt.Command$$anonfun$applyEffect$1$$anonfun$apply$2.apply(Command.scala:58) at sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:60) at sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:60) at sbt.Command$.process(Command.scala:92) at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:98) at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:98) at sbt.State$$anon$1.process(State.scala:184) at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:98) at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:98) at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) at sbt.MainLoop$.next(MainLoop.scala:98) at sbt.MainLoop$.run(MainLoop.scala:91) at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:70) at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:65) at sbt.Using.apply(Using.scala:24) at sbt.MainLoop$.runWithNewLog(MainLoop.scala:65) at sbt.MainLoop$.runAndClearLast(MainLoop.scala:48) at sbt.MainLoop$.runLoggedLoop(MainLoop.scala:32) at sbt.MainLoop$.runLogged(MainLoop.scala:24) at sbt.StandardMain$.runManaged(Main.scala:53) at sbt.xMain.run(Main.scala:28) at xsbt.boot.Launch$$anonfun$run$1.apply(Launch.scala:109) at xsbt.boot.Launch$.withContextLoader(Launch.scala:128) at xsbt.boot.Launch$.run(Launch.scala:109) at xsbt.boot.Launch$$anonfun$apply$1.apply(Launch.scala:35) at xsbt.boot.Launch$.launch(Launch.scala:117) at xsbt.boot.Launch$.apply(Launch.scala:18) at
GraphX vs GraphLab
Hi Team, Is any one done comparison(pros and cons ) study between GraphX ad GraphLab. Could you please let me know any links for this comparison. Regards, Rajesh
Re: Failed to save RDD as text file to local file system
Have you tried simple giving the path where you want to save the file ? For instance in your case just do *r.saveAsTextFile(home/cloudera/tmp/out1) * Dont use* file* This will create a folder with name out1. saveAsTextFile always write by making a directory, it does not write data into a single file. Incase you need a single file you can use copyMerge API in FileUtils. *FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 , true, conf,null);* Now out2 will be a single file containing your data. *fs* is the configuration of you local file system. Thanks On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] ml-node+s1001560n21068...@n3.nabble.com wrote: No, do you have any idea? Regards, *Ningjun Wang* Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 *From:* firemonk9 [via Apache Spark User List] [mailto:ml-node+[hidden email] http:///user/SendEmail.jtp?type=nodenode=21068i=0] *Sent:* Friday, January 09, 2015 2:56 PM *To:* Wang, Ningjun (LNG-NPV) *Subject:* Re: Failed to save RDD as text file to local file system Have you found any resolution for this issue ? -- *If you reply to this email, your message will be added to the discussion below:* http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html To unsubscribe from Failed to save RDD as text file to local file system, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21068.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21093.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
how to select the first row in each group by group?
Hi all:I am using spark sql to read and write hive tables. But There is a issue that how to select the first row in each group by group?In hive, we could write hql like this:SELECT imeiFROM (SELECT imei, row_number() over (PARTITION BY imei ORDER BY login_time ASC) AS row_num FROM login_log_2015010914) a WHERE row_num = 1 In spark sql, how to write the sql equal to the hql?
Manually trigger RDD map function without action
Hi all Is there efficient way to trigger RDD transformations? I'm now using count action to achieve this. Best regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: creating a single kafka producer object for all partitions
Actually this code is producing error leader not found exception. I am unable to find the reason On Mon, Jan 12, 2015 at 4:03 PM, kevinkim [via Apache Spark User List] ml-node+s1001560n21098...@n3.nabble.com wrote: Well, you can use coalesce() to decrease number of partition to 1. (It will take time and quite not efficient, tough) Regards, Kevin. On Mon Jan 12 2015 at 7:57:39 PM Hafiz Mujadid [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21098i=0 wrote: Hi experts! I have a schemaRDD of messages to be pushed in kafka. So I am using following piece of code to do that rdd.foreachPartition(itr = { val props = new Properties() props.put(metadata.broker.list, brokersList) props.put(serializer.class, kafka.serializer.StringEncoder) props.put(compression.codec, codec.toString) props.put(producer.type, sync) props.put(batch.num.messages, BatchSize.toString) props.put(message.send.max.retries, maxRetries.toString) props.put(request.required.acks, -1) producer = new Producer[String, String](new ProducerConfig(props)) itr.foreach(row = { val msg = row.toString.drop(1).dropRight(1) this.synchronized { producer.send(new KeyedMessage[String, String](Topic, msg)) } }) producer.close }) the problem with this code is that it creates kafka producer separate for each partition and I want a single producer for all partitions. Is there any way to achieve this? -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21098.html To unsubscribe from creating a single kafka producer object for all partitions, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21097code=aGFmaXptdWphZGlkMDBAZ21haWwuY29tfDIxMDk3fC05MjEzOTMxMTE= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Regards: HAFIZ MUJADID -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21099.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Does DecisionTree model in MLlib deal with missing values?
On Sun, Jan 11, 2015 at 9:46 PM, Christopher Thom christopher.t...@quantium.com.au wrote: Is there any plan to extend the data types that would be accepted by the Tree models in Spark? e.g. Many models that we build contain a large number of string-based categorical factors. Currently the only strategy is to map these string values to integers, and store the mapping so the data can be remapped when the model is scored. A viable solution, but cumbersome for models with hundreds of these kinds of factors. I think there is nothing on the roadmap, except that in the newer ML API (the bits under spark.ml), there's fuller support for the idea of a pipeline of transformations, of which performing this encoding could be one step. Since it's directly relevant, I don't mind mentioning that we did build this sort of logic on top of MLlib and PMML. There's nothing hard about it, just a pile of translation and counting code, such as in https://github.com/OryxProject/oryx/blob/master/oryx-app-common/src/main/java/com/cloudera/oryx/app/rdf/RDFPMMLUtils.java So there are bits you can reuse out there especially if your goal is to get to PMML, which will want to represent all the actual categorical values in its DataDictionary and not encodings. Concerning missing data, I haven't been able to figure out how to use NULL values in LabeledPoints, and I'm not sure whether DecisionTrees correctly handle the case of missing data. The only thing I've been able to work out is to use a placeholder value, Yes, I don't think that's supported. In model training, you can simply ignore data that can't reach the node because it lacks a feature needed in a decision rule. This is OK as long as not that much data is missing. In scoring you can't not-answer. Again if you refer to PMML, you can see some ideas about how to handle this: http://www.dmg.org/v4-2-1/TreeModel.html#xsdType_MISSING-VALUE-STRATEGY - Make no prediction - Just copy the last prediction - Use a model-supplied default for the node - Use some confidence weighted combination of the answer you'd get by following both paths I have opted, in the past, for simply defaulting to the subtree with more training examples. All of these strategies are approximations, yes. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RowMatrix multiplication
I have a rowMatrix on which I want to perform two multiplications. The first is a right multiplication with a local matrix which is fine. But after that I also wish to right multiply the transpose of my rowMatrix with a different local matrix. I understand that there is no functionality to transpose a rowMatrix at this time but I was wondering if anyone could suggest a any kind of work-around for this. I had thought that I might be able to initially create two rowMatrices - a normal version and a transposed version - and use either when appropriate. Can anyone think of another alternative? Thanks, Alex
Re: Manually trigger RDD map function without action
If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing? On Mon, Jan 12, 2015 at 4:08 AM, kevinkim kevin...@apache.org wrote: Hi, answer from another Kevin. I think you may already know it, 'transformation' in spark ( http://spark.apache.org/docs/latest/programming-guide.html#transformations ) will be done in 'lazy' way, when you trigger 'actions'. (http://spark.apache.org/docs/latest/programming-guide.html#actions) So you can use 'collect' - if you need result from memory 'count' - if you need to count 'saveAs ...' - if you need to persist transformed RDD Regards, Kevin On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21095i=0 wrote: Hi all Is there efficient way to trigger RDD transformations? I'm now using count action to achieve this. Best regards Kevin -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Manually trigger RDD map function without action http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: creating a single kafka producer object for all partitions
You should take a look at https://issues.apache.org/jira/browse/SPARK-4122 which is implementing writing to kafka in a pretty similar way (make a new producer inside foreachPartition) On Mon, Jan 12, 2015 at 5:24 AM, Sean Owen so...@cloudera.com wrote: Leader-not-found suggests a problem with zookeeper config. It depends a lot on the specifics of your error. But this is really a Kafka question, better for the Kafka list. On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Actually this code is producing error leader not found exception. I am unable to find the reason - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Removing JARs from spark-jobserver
just an FYI: you can configure that using spark.jobserver.filedao.rootdir On Mon, Jan 12, 2015 at 1:52 AM, abhishek reachabhishe...@gmail.com wrote: Nice! Good to know On 11 Jan 2015 21:10, Sasi [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21089i=0 wrote: Thank you Abhishek. That works. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-JARs-from-spark-jobserver-tp21081p21084.html To start a new topic under Apache Spark User List, email [hidden email] http:///user/SendEmail.jtp?type=nodenode=21089i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Removing JARs from spark-jobserver http://apache-spark-user-list.1001560.n3.nabble.com/Removing-JARs-from-spark-jobserver-tp21081p21089.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
creating a single kafka producer object for all partitions
Hi experts! I have a schemaRDD of messages to be pushed in kafka. So I am using following piece of code to do that rdd.foreachPartition(itr = { val props = new Properties() props.put(metadata.broker.list, brokersList) props.put(serializer.class, kafka.serializer.StringEncoder) props.put(compression.codec, codec.toString) props.put(producer.type, sync) props.put(batch.num.messages, BatchSize.toString) props.put(message.send.max.retries, maxRetries.toString) props.put(request.required.acks, -1) producer = new Producer[String, String](new ProducerConfig(props)) itr.foreach(row = { val msg = row.toString.drop(1).dropRight(1) this.synchronized { producer.send(new KeyedMessage[String, String](Topic, msg)) } }) producer.close }) the problem with this code is that it creates kafka producer separate for each partition and I want a single producer for all partitions. Is there any way to achieve this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Play Scala Spark Exmaple
The EC2 versión is 1.1.0 and this is my build.sbt: libraryDependencies ++= Seq( jdbc, anorm, cache, org.apache.spark %% spark-core % 1.1.0, com.typesafe.akka %% akka-actor % 2.2.3, com.typesafe.akka %% akka-slf4j % 2.2.3, org.apache.spark %% spark-streaming-twitter % 1.1.0, org.apache.spark %% spark-sql % 1.1.0, org.apache.spark %% spark-mllib % 1.1.0 ) On Sun, Jan 11, 2015 at 3:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What is your spark version that is running on the EC2 cluster? From the build file https://github.com/knoldus/Play-Spark-Scala/blob/master/build.sbt of your play application it seems that it uses Spark 1.0.1. Thanks Best Regards On Fri, Jan 9, 2015 at 7:17 PM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi guys, I running the following example : https://github.com/knoldus/Play-Spark-Scala in the same machine as the spark master, and the spark cluster was lauched with ec2 script. I'm stuck with this errors, any idea how to fix it? Regards Eduardo call the play app prints the following exception : [*error*] a.r.EndpointWriter - AssociationError [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] - [akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575]: Error [Shut down address: akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. The master recive the spark application and generate the following stderr log page: 15/01/09 13:31:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856] 15/01/09 13:31:23 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856] 15/01/09 13:31:23 INFO util.Utils: Successfully started service 'sparkExecutor' on port 37856. 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/MapOutputTracker 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/BlockManagerMaster 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20150109133123-3805 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at /mnt2/spark/spark-local-20150109133123-b05e 15/01/09 13:31:23 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 36936. 15/01/09 13:31:23 INFO network.ConnectionManager: Bound socket to port 36936 with id = ConnectionManagerId(ip-10-158-18-250.ec2.internal,36936) 15/01/09 13:31:23 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Registered BlockManager 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/HeartbeatReceiver 15/01/09 13:31:54 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:57671] - [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] disassociated! Shutting down.
Re: calculating the mean of SparseVector RDD
This was without using Kryo -- if I use kryo, I got errors about buffer overflows (see above): com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5, required: 8 Just calling colStats doesn't actually compute those statistics, does it? It looks like the computation is only carried out once you call the .mean() method. On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng men...@gmail.com wrote: colStats() computes the mean values along with several other summary statistics, which makes it slower. How is the performance if you don't use kryo? -Xiangrui On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote: thanks for the suggestion -- however, looks like this is even slower. With the small data set I'm using, my aggregate function takes ~ 9 seconds and the colStats.mean() takes ~ 1 minute. However, I can't get it to run with the Kyro serializer -- I get the error: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5, required: 8 is there an easy/obvious fix? On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote: There is some serialization overhead. You can try https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107 . -Xiangrui On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote: I have an RDD of SparseVectors and I'd like to calculate the means returning a dense vector. I've tried doing this with the following (using pyspark, spark v1.2.0): def aggregate_partition_values(vec1, vec2) : vec1[vec2.indices] += vec2.values return vec1 def aggregate_combined_vectors(vec1, vec2) : if all(vec1 == vec2) : # then the vector came from only one partition return vec1 else: return vec1 + vec2 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values, aggregate_combined_vectors) means = means / nvals This turns out to be really slow -- and doesn't seem to depend on how many vectors there are so there seems to be some overhead somewhere that I'm not understanding. Is there a better way of doing this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with building spark-1.2.0
The problem is there in the logs. When it went to clone some code, something went wrong with the proxy: Received HTTP code 407 from proxy after CONNECT Probably you have an HTTP proxy and you have not authenticated. It's specific to your environment. Although it's unrelated, I'm curious how your build refers to https://github.com/ScrapCodes/sbt-pom-reader.git I don't see this repo the project code base. On Mon, Jan 12, 2015 at 9:09 AM, Kartheek.R kartheek.m...@gmail.com wrote: Hi, This is what I am trying to do: karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. [info] Loading project definition from /home/karthik/spark-1.2.0/project/project Cloning into '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'... fatal: unable to access 'https://github.com/ScrapCodes/sbt-pom-reader.git/': Received HTTP code 407 from proxy after CONNECT java.lang.RuntimeException: Nonzero exit code (128): git clone https://github.com/ScrapCodes/sbt-pom-reader.git - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Parquet - data are reading very very slow
yes , i am also facing same problem .. please any one help to get fast execution. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Parquet-data-are-reading-very-very-slow-tp21061p21100.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: creating a single kafka producer object for all partitions
Leader-not-found suggests a problem with zookeeper config. It depends a lot on the specifics of your error. But this is really a Kafka question, better for the Kafka list. On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Actually this code is producing error leader not found exception. I am unable to find the reason - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Manually trigger RDD map function without action
Hi, answer from another Kevin. I think you may already know it, 'transformation' in spark (http://spark.apache.org/docs/latest/programming-guide.html#transformations) will be done in 'lazy' way, when you trigger 'actions'. (http://spark.apache.org/docs/latest/programming-guide.html#actions) So you can use 'collect' - if you need result from memory 'count' - if you need to count 'saveAs ...' - if you need to persist transformed RDD Regards, Kevin On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] ml-node+s1001560n21094...@n3.nabble.com wrote: Hi all Is there efficient way to trigger RDD transformations? I'm now using count action to achieve this. Best regards Kevin -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=a2V2aW5raW1AYXBhY2hlLm9yZ3wxfC0xNDUyMjU3MDUw . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Issue with Parquet on Spark 1.2 and Amazon EMR
Meanwhile, I have submitted a pull request ( https://github.com/awslabs/emr-bootstrap-actions/pull/37) that allows users to place their jars ahead of all other jars in spark classpath. This should serve as a temporary workaround for all class conflicts. Thanks, Aniket On Mon Jan 05 2015 at 22:13:47 Kelly, Jonathan jonat...@amazon.com wrote: I've noticed the same thing recently and will contact the appropriate owner soon. (I work for Amazon, so I'll go through internal channels and report back to this list.) In the meantime, I've found that editing spark-env.sh and putting the Spark assembly first in the classpath fixes the issue. I expect that the version of Parquet that's being included in the EMR libs just needs to be upgraded. ~ Jonathan Kelly From: Aniket Bhatnagar aniket.bhatna...@gmail.com Date: Sunday, January 4, 2015 at 10:51 PM To: Adam Gilmore dragoncu...@gmail.com, user@spark.apache.org user@spark.apache.org Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR Can you confirm your emr version? Could it be because of the classpath entries for emrfs? You might face issues with using S3 without them. Thanks, Aniket On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore dragoncu...@gmail.com wrote: Just an update on this - I found that the script by Amazon was the culprit - not exactly sure why. When I installed Spark manually onto the EMR (and did the manual configuration of all the EMR stuff), it worked fine. On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore dragoncu...@gmail.com wrote: Hi all, I've just launched a new Amazon EMR cluster and used the script at: s3://support.elasticmapreduce/spark/install-spark to install Spark (this script was upgraded to support 1.2). I know there are tools to launch a Spark cluster in EC2, but I want to use EMR. Everything installs fine; however, when I go to read from a Parquet file, I end up with (the main exception): Caused by: java.lang.NoSuchMethodError: parquet.hadoop.ParquetInputSplit.init(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V at parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578) ... 55 more It seems to me like a version mismatch somewhere. Where is the parquet-hadoop jar coming from? Is it built into a fat jar for Spark? Any help would be appreciated. Note that 1.1.1 worked fine with Parquet files.
Spark does not loop through a RDD.map
Hi, I am observing some weird behavior with spark, it might be my mis-interpretation of some fundamental concepts but I have look at it for 3 days and have not been able to solve it. The source code is pretty long and complex so instead of posting it, I will try to articulate the problem. I am building a Sentiment Analyser using the Naive Bayes model in Spark. 1) I have taken text files in RAW format and created a RDD of words-Array(files the word is found in). 2) From this I have derived the features array for each file which is an Array[Double], a 0.0 if the file does not contain the word and 1.0 if the word is found in the file 3) I have then created an RDD[LabeledPoints] from this I have created the Naive Baiyes model using the following code val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2) val test = splits(1) Logger.info(Training count: + training.count() + Testing count: + test.count()) model = NaiveBayes.train(training, lambda = 1.0) val predictionAndLabel = test.map(p = (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x = x._1 == x._2).count() / test.count() Logger.info(Fold:[ + fold + ] accuracy: [ + accuracy +]) 4) The model seems to be fine and the accuracy is about 75% to 82% depending on which set of input fles I provide. 5) Now I am using this model to predict(), here I am creating the same feature array from the input text file and I have code as follows, /* * Print all the features (words) in the feature array */ allFeaturesRDD.foreach((x) = print(x + , )) /* * Build the feature array */ val features = buildFeatureArray(reviewID,wordSeqRdd) Fails here, have show this code below logFeatureArray(features) val prediction = model.predict(Vectors.dense(features)) Logger.info (Prediction: + prediction) == reviewID filename wordReviewSeqRDD - RDD[(word, Array(filename)] def buildFeatureArray(reviewID:String, wordReviewSeqRDD:RDD[(String,Seq[String])]): Array[Double] = { val numWords = allFeaturesRDD.count --- number of all words in the feature val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap() var featArray:Array[Double] = new Array(numWords.toInt) --- create an empty features array var idx = 0 if (trainingDone) Logger.info(Feature len: + numWords) allFeaturesRDD.map{ *-- This is where it is failing, * case(eachword) = { *-- for some reason the code does not enter here * val reviewList = wordReviewSeqMap.get(eachword).get if (trainingDone == true) { println(1. eachword: + eachword + reviewList: + reviewList) println(2. reviewList.size: + reviewList.length) println(3. reviewList(0): + reviewList(0)) } featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else 0.toDouble idx += 1 } } featArray } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Failed to save RDD as text file to local file system
I think you're confusing HDFS paths and local paths. You are cd'ing to a directory and seem to want to write output there, but your path has no scheme and defaults to being an HDFS path. When you use file: you seem to have a permission error (perhaps). On Mon, Jan 12, 2015 at 4:21 PM, NingjunWang ningjun.w...@lexisnexis.com wrote: Prannoy I tried this r.saveAsTextFile(home/cloudera/tmp/out1), it return without error. But where does it saved to? The folder “/home/cloudera/tmp/out1” is not cretaed. I also tried the following cd /home/cloudera/tmp/ spark-shell scala val r = sc.parallelize(Array(a, b, c)) scala r.saveAsTextFile(out1) It does not return error. But still there is no “out1” folder created under /home/cloudera/tmp/ I tried to give absolute path but then get an error scala r.saveAsTextFile(/home/cloudera/tmp/out1) org.apache.hadoop.security.AccessControlException: Permission denied: user=cloudera, access=WRITE, inode=/:hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6286) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6268) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6220) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4087) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007) Very frustrated. Please advise. Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: Prannoy [via Apache Spark User List] [mailto:ml-node+[hidden email]] Sent: Monday, January 12, 2015 4:18 AM To: Wang, Ningjun (LNG-NPV) Subject: Re: Failed to save RDD as text file to local file system Have you tried simple giving the path where you want to save the file ? For instance in your case just do r.saveAsTextFile(home/cloudera/tmp/out1) Dont use file This will create a folder with name out1. saveAsTextFile always write by making a directory, it does not write data into a single file. Incase you need a single file you can use copyMerge API in FileUtils. FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 , true, conf,null); Now out2 will be a single file containing your data. fs is the configuration of you local file system. Thanks On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] [hidden email] wrote: No, do you have any idea? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: firemonk9 [via Apache Spark User List] [mailto:[hidden email][hidden email]] Sent: Friday, January 09, 2015 2:56 PM To: Wang, Ningjun (LNG-NPV) Subject: Re: Failed to save RDD as text file to local file system Have you found any resolution for this issue
Re: How to use memcached with spark
I am trying to use it, but without success. Any sample code that works with Spark would be highly appreciated. :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-memcached-with-spark-tp13409p21103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark executors resources. Blocking?
Hello all, I have a naive question regarding how spark uses the executors in a cluster of machines. Imagine the scenario in which I do not know the input size of my data in execution A, so I set Spark to use 20 (out of my 25 nodes, for instance). At the same time, I also launch a second execution B, setting Spark to use 10 nodes for this. Assuming a huge input size for execution A, which implies an execution time of 30 minutes for example (using all the resources), and a constant execution time for B of 10 minutes, then both executions will last for 40 minutes (I assume that B cannot be launched until 10 resources are completely available, when A finishes). Now, assuming a very small input size for execution A running for 5 minutes in only 2 of the 20 planned resources, I would like execution B to be launched at that time, consuming both executions only 10 minutes (and 12 resources). However, as execution A has set Spark to use 20 resources, execution B has to wait until A has finished, so the total execution time lasts for 15 minutes. Is this right? If so, how can I solve this kind of scenarios? If I am wrong, what would be the correct interpretation for this? Thanks in advance, Best
Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0
Hi Ganelin, sorry if it wasn't clear from my previous email, but that is how I am creating a spark context. I just didn't write out the lines where I create the new SparkConf and SparkContext. I am also upping the driver memory when running. Thanks, David On 01/12/2015 11:12 AM, Ganelin, Ilya wrote: There are two related options: To solve your problem directly try: valconf =newSparkConf().set(spark.yarn.driver.memoryOverhead,1024) valsc =newSparkContext(conf) And the second, which increases the overall memory available on the driver, as part of your spark-submit script add: --driver-memory 2g Hope this helps! From: David McWhorter mcwhor...@ccri.com mailto:mcwhor...@ccri.com Date: Monday, January 12, 2015 at 11:01 AM To: user@spark.apache.org mailto:user@spark.apache.org user@spark.apache.org mailto:user@spark.apache.org Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0 Hi all, I'm trying to figure out how to set this option: spark.yarn.driver.memoryOverhead on Spark 1.2.0. I found this helpful overview http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to spark-submit. However, when I do that I get this error: Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'. Run with --help for usage help or --verbose for debug output I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark configuration object but I still get Will allocate AM container, with MB memory including 384 MB overhead when launching. I'm running in yarn-cluster mode. Any help or tips would be appreciated. Thanks, David -- David McWhorter Software Engineer Commonwealth Computer Research, Inc. 1422 Sachem Place, Unit #1 Charlottesville, VA 22901 mcwhor...@ccri.com | 434.299.0090x204 The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. -- David McWhorter Software Engineer Commonwealth Computer Research, Inc. 1422 Sachem Place, Unit #1 Charlottesville, VA 22901 mcwhor...@ccri.com | 434.299.0090x204
Re: Spark does not loop through a RDD.map
At a quick glance, I think you're misunderstanding some basic features. http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations Map is a transformation, it is lazy. You're not calling any action on the result of map. Also, closing over a mutable variable (like idx or featArray here) won't work; that closure is being run on executors, not the driver where your main code is running. On Mon, Jan 12, 2015 at 9:49 AM, rkgurram rkgur...@gmail.com wrote: Hi, I am observing some weird behavior with spark, it might be my mis-interpretation of some fundamental concepts but I have look at it for 3 days and have not been able to solve it. The source code is pretty long and complex so instead of posting it, I will try to articulate the problem. I am building a Sentiment Analyser using the Naive Bayes model in Spark. 1) I have taken text files in RAW format and created a RDD of words-Array(files the word is found in). 2) From this I have derived the features array for each file which is an Array[Double], a 0.0 if the file does not contain the word and 1.0 if the word is found in the file 3) I have then created an RDD[LabeledPoints] from this I have created the Naive Baiyes model using the following code val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2) val test = splits(1) Logger.info(Training count: + training.count() + Testing count: + test.count()) model = NaiveBayes.train(training, lambda = 1.0) val predictionAndLabel = test.map(p = (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x = x._1 == x._2).count() / test.count() Logger.info(Fold:[ + fold + ] accuracy: [ + accuracy +]) 4) The model seems to be fine and the accuracy is about 75% to 82% depending on which set of input fles I provide. 5) Now I am using this model to predict(), here I am creating the same feature array from the input text file and I have code as follows, /* * Print all the features (words) in the feature array */ allFeaturesRDD.foreach((x) = print(x + , )) /* * Build the feature array */ val features = buildFeatureArray(reviewID,wordSeqRdd) Fails here, have show this code below logFeatureArray(features) val prediction = model.predict(Vectors.dense(features)) Logger.info (Prediction: + prediction) == reviewID filename wordReviewSeqRDD - RDD[(word, Array(filename)] def buildFeatureArray(reviewID:String, wordReviewSeqRDD:RDD[(String,Seq[String])]): Array[Double] = { val numWords = allFeaturesRDD.count --- number of all words in the feature val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap() var featArray:Array[Double] = new Array(numWords.toInt) --- create an empty features array var idx = 0 if (trainingDone) Logger.info(Feature len: + numWords) allFeaturesRDD.map{ *-- This is where it is failing, * case(eachword) = { *-- for some reason the code does not enter here * val reviewList = wordReviewSeqMap.get(eachword).get if (trainingDone == true) { println(1. eachword: + eachword + reviewList: + reviewList) println(2. reviewList.size: + reviewList.length) println(3. reviewList(0): + reviewList(0)) } featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else 0.toDouble idx += 1 } } featArray } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0
Isn't the syntax --conf property=value? http://spark.apache.org/docs/latest/configuration.html Yes, I think setting it after the driver is running is of course too late. On Mon, Jan 12, 2015 at 4:01 PM, David McWhorter mcwhor...@ccri.com wrote: Hi all, I'm trying to figure out how to set this option: spark.yarn.driver.memoryOverhead on Spark 1.2.0. I found this helpful overview http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to spark-submit. However, when I do that I get this error: Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'. Run with --help for usage help or --verbose for debug output I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark configuration object but I still get Will allocate AM container, with MB memory including 384 MB overhead when launching. I'm running in yarn-cluster mode. Any help or tips would be appreciated. Thanks, David -- David McWhorter Software Engineer Commonwealth Computer Research, Inc. 1422 Sachem Place, Unit #1 Charlottesville, VA 22901 mcwhor...@ccri.com | 434.299.0090x204 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RowMatrix multiplication
That's not quite what I'm looking for. Let me provide an example. I have a rowmatrix A that is nxm and I have two local matrices b and c. b is mx1 and c is nx1. In my spark job I wish to perform the following two computations A*b and A^T*c I don't think this is possible without being able to transpose a rowmatrix. Am I correct? Thanks, Alex From: Reza Zadeh r...@databricks.com Sent: Monday, January 12, 2015 1:58 PM To: Alex Minnaar Cc: u...@spark.incubator.apache.org Subject: Re: RowMatrix multiplication As you mentioned, you can perform A * b, where A is a rowmatrix and b is a local matrix. From your email, I figure you want to compute b * A^T. To do this, you can compute C = A b^T, whose result is the transpose of what you were looking for, i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually yourself. Be careful though, because the result might not have each Row fit in memory on a single machine, which is what RowMatrix requires. This danger is why we didn't provide a transpose operation in RowMatrix natively. To address this and more, there is an effort to provide more comprehensive linear algebra through block matrices, which will likely make it to 1.3: https://issues.apache.org/jira/browse/SPARK-3434 Best, Reza On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar aminn...@verticalscope.commailto:aminn...@verticalscope.com wrote: I have a rowMatrix on which I want to perform two multiplications. The first is a right multiplication with a local matrix which is fine. But after that I also wish to right multiply the transpose of my rowMatrix with a different local matrix. I understand that there is no functionality to transpose a rowMatrix at this time but I was wondering if anyone could suggest a any kind of work-around for this. I had thought that I might be able to initially create two rowMatrices - a normal version and a transposed version - and use either when appropriate. Can anyone think of another alternative? Thanks, Alex
Re: Getting Output From a Cluster
Hello Everyone, Quick followup, is there any way I can append output to one file rather then create a new directory/file every X milliseconds? Thanks! Suhas Shekar University of California, Los Angeles B.A. Economics, Specialization in Computing 2014 On Thu, Jan 8, 2015 at 11:41 PM, Su She suhsheka...@gmail.com wrote: 1) Thank you everyone for the help once again...the support here is really amazing and I hope to contribute soon! 2) The solution I actually ended up using was from this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E in case the thread ever goes down, the soln provided by Matei: plans.saveAsHadoopFiles(hdfs://localhost:8020/user/hue/output/completed,csv, String.class, String.class, (Class) TextOutputFormat.class); I had browsed a lot of similar threads that did not have answers, but found this one from quite some time ago, so apologize for posting a question that had been answered before. 3) Akhil, I was specifying the format as txt, but it was not compatible Thanks for the help! On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: saveAsHadoopFiles requires you to specify the output format which i believe you are not specifying anywhere and hence the program crashes. You could try something like this: Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) SequenceFileOutputFormat.class; 46 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl, /output-location,Text.class, Text.class, outputFormatClass); Thanks Best Regards On Fri, Jan 9, 2015 at 10:22 AM, Su She suhsheka...@gmail.com wrote: Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I call print on the Dstream it works? If I had to do foreachRDD to saveAsHadoopFile, then why is it working for print? Also, if I am doing foreachRDD, do I need connections, or can I simply put the saveAsHadoopFiles inside the foreachRDD function? Thanks Yana for the help! I will play around with foreachRDD and convey my results. On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: are you calling the saveAsText files on the DStream --looks like it? Look at the section called Design Patterns for using foreachRDD in the link you sent -- you want to do dstream.foreachRDD(rdd = rdd.saveAs) On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, Thanks in advance for the help! I successfully got my Kafka/Spark WordCount app to print locally. However, I want to run it on a cluster, which means that I will have to save it to HDFS if I want to be able to read the output. I am running Spark 1.1.0, which means according to this document: https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html I should be able to use commands such as saveAsText/HadoopFiles. 1) When I try saveAsTextFiles it says: cannot find symbol [ERROR] symbol : method saveAsTextFiles(java.lang.String,java.lang.String) [ERROR] location: class org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer This makes some sense as saveAsTextFiles is not included here: http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html 2) When I try saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount, txt) it builds, but when I try running it it throws this exception: Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079) at org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
Re: /tmp directory fills up
Hi Alessandro, You can look for a log line like this in your driver's output: 15/01/12 10:51:01 INFO storage.DiskBlockManager: Created local directory at /data/yarn/nm/usercache/systest/appcache/application_1421081007635_0002/spark-local-20150112105101-4f3d If you're deploying your application in cluster mode, the temp directory will be under the Yarn-defined application dir. In client mode, the driver will create some stuff under spark.local.dir, but the driver itself generally doesn't create many temp files IIRC. On Fri, Jan 9, 2015 at 11:32 PM, Alessandro Baretta alexbare...@gmail.com wrote: Gents, I'm building spark using the current master branch and deploying in to Google Compute Engine on top of Hadoop 2.4/YARN via bdutil, Google's Hadoop cluster provisioning tool. bdutils configures Spark with spark.local.dir=/hadoop/spark/tmp, but this option is ignored in combination with YARN. Bdutils also configures YARN with: property nameyarn.nodemanager.local-dirs/name value/mnt/pd1/hadoop/yarn/nm-local-dir/value description Directories on the local machine in which to application temp files. /description /property This is the right directory for spark to store temporary data in. Still, Spark is creating such directories as this: /tmp/spark-51388ee6-9de6-411d-b9b9-ab6f9502d01e and filling them up with gigabytes worth of output files, filling up the very small root filesystem. How can I diagnose why my Spark installation is not picking up the yarn.nodemanager.local-dirs from yarn? Alex -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ReliableKafkaReceiver stopped receiving data after WriteAheadLogBasedBlockHandler throws TimeoutException
Hi all, I am running a Spark streaming application with ReliableKafkaReceiver (Spark 1.2.0). Constantly I was getting the following exception: 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126) at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275) at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181) at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154) at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86) After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped receiving data from Kafka. The Kafka message handler thread is in BLOCKED state: Thread 92: KafkaMessageHandler-0 (BLOCKED) org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123) org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185) org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) java.util.concurrent.FutureTask.run(FutureTask.java:262) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Sometimes when the exception was thrown, I also see warning messages like this: 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 30533ms (threshold=3ms); ack: seqno: 113 status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010] 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms (threshold=3ms) In the past, I never have such problem with KafkaReceiver. What causes this exception? How can I solve this problem? Thanks in advance, Max
Re: RowMatrix multiplication
?Good idea! Join each element of c with the corresponding row of A, multiply through, then reduce. I'll give this a try. Thanks, Alex From: Reza Zadeh r...@databricks.com Sent: Monday, January 12, 2015 3:05 PM To: Alex Minnaar Cc: u...@spark.incubator.apache.org Subject: Re: RowMatrix multiplication Yes you are correct, to do it with existing operations you would need a transpose on rowmatrix. However, you can fairly easily perform the operation manually by doing a join (if the c vector is an RDD) or broadcasting c (if the c vector is small enough to fit in memory on a single machine). On Mon, Jan 12, 2015 at 11:45 AM, Alex Minnaar aminn...@verticalscope.commailto:aminn...@verticalscope.com wrote: That's not quite what I'm looking for. Let me provide an example. I have a rowmatrix A that is nxm and I have two local matrices b and c. b is mx1 and c is nx1. In my spark job I wish to perform the following two computations A*b and A^T*c I don't think this is possible without being able to transpose a rowmatrix. Am I correct? Thanks, Alex From: Reza Zadeh r...@databricks.commailto:r...@databricks.com Sent: Monday, January 12, 2015 1:58 PM To: Alex Minnaar Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: RowMatrix multiplication As you mentioned, you can perform A * b, where A is a rowmatrix and b is a local matrix. From your email, I figure you want to compute b * A^T. To do this, you can compute C = A b^T, whose result is the transpose of what you were looking for, i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually yourself. Be careful though, because the result might not have each Row fit in memory on a single machine, which is what RowMatrix requires. This danger is why we didn't provide a transpose operation in RowMatrix natively. To address this and more, there is an effort to provide more comprehensive linear algebra through block matrices, which will likely make it to 1.3: https://issues.apache.org/jira/browse/SPARK-3434 Best, Reza On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar aminn...@verticalscope.commailto:aminn...@verticalscope.com wrote: I have a rowMatrix on which I want to perform two multiplications. The first is a right multiplication with a local matrix which is fine. But after that I also wish to right multiply the transpose of my rowMatrix with a different local matrix. I understand that there is no functionality to transpose a rowMatrix at this time but I was wondering if anyone could suggest a any kind of work-around for this. I had thought that I might be able to initially create two rowMatrices - a normal version and a transposed version - and use either when appropriate. Can anyone think of another alternative? Thanks, Alex
Re: How to recovery application running records when I restart Spark master?
Is there any body can help me with this? Thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
no snappyjava in java.library.path
Hi, My Spark job failed with no snappyjava in java.library.path as: Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857) at java.lang.Runtime.loadLibrary0(Runtime.java:870) at java.lang.System.loadLibrary(System.java:1119) at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52) I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I have included it in the CLASSPATH already. ../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar ../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar ../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar Did I miss anything or I should set it in other way? Cheers, Dan
Re: no snappyjava in java.library.path
I ran into this recently. Turned out we had an old org-xerial-snappy.properties file in one of our conf directories that had the setting: # Disables loading Snappy-Java native library bundled in the # snappy-java-*.jar file forcing to load the Snappy-Java native # library from the java.library.path. # org.xerial.snappy.disable.bundled.libs=true When I switched that to false, it made the problem go away. May or may not be your problem of course, but worth a look. HTH, DR On 01/12/2015 03:28 PM, Dan Dong wrote: Hi, My Spark job failed with no snappyjava in java.library.path as: Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857) at java.lang.Runtime.loadLibrary0(Runtime.java:870) at java.lang.System.loadLibrary(System.java:1119) at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52) I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I have included it in the CLASSPATH already. ../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar ../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar ../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar Did I miss anything or I should set it in other way? Cheers, Dan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Web Service + Spark
If you would like to work with an API, you can use the Spark Kernel found here: https://github.com/ibm-et/spark-kernel The kernel provides an API following the IPython message protocol as well as a client library that can be used with Scala applications. The kernel can also be plugged into the latest developmental version of IPython 3.0 in case you want to do more visual exploration. Signed, Chip Senkbeil IBM Emerging Technology Software Engineer From: Raghavendra Pandey raghavendra.pan...@gmail.com To: Cui Lin cui@hds.com, gtinside gtins...@gmail.com, Corey Nolet cjno...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Date: 01/11/2015 02:06 AM Subject:Re: Web Service + Spark You can take a look at http://zeppelin.incubator.apache.org. it is a notebook and graphic visual designer. On Sun, Jan 11, 2015, 01:45 Cui Lin cui@hds.com wrote: Thanks, Gaurav and Corey, Probably I didn’t make myself clear. I am looking for best Spark practice similar to Shiny for R, the analysis/visualziation results can be easily published to web server and shown from web browser. Or any dashboard for Spark? Best regards, Cui Lin From: gtinside gtins...@gmail.com Date: Friday, January 9, 2015 at 7:45 PM To: Corey Nolet cjno...@gmail.com Cc: Cui Lin cui@hds.com, user@spark.apache.org user@spark.apache.org Subject: Re: Web Service + Spark You can also look at Spark Job Server https://github.com/spark-jobserver/spark-jobserver - Gaurav On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote: Cui Lin, The solution largely depends on how you want your services deployed (Java web container, Spray framework, etc...) and if you are using a cluster manager like Yarn or Mesos vs. just firing up your own executors and master. I recently worked on an example for deploying Spark services inside of Jetty using Yarn as the cluster manager. It forced me to learn how Spark wires up the dependencies/classpaths. If it helps, the example that resulted from my tinkering is located at [1]. [1] https://github.com/calrissian/spark-jetty-server On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote: Hello, All, What’s the best practice on deploying/publishing spark-based scientific applications into a web service? Similar to Shiny on R. Thanks! Best regards, Cui Lin
Spark Framework handling of Mesos master change
We are running Spark and Spark Streaming on Mesos (with multiple masters for HA). At launch, our Spark jobs successfully look up the current Mesos master from zookeeper and spawn tasks. However, when the Mesos master changes while the spark job is executing, the spark driver seems to interact with the old Mesos master, and therefore fails to launch any new tasks. We are running long running Spark streaming jobs, so we have temporarily switched to coarse grained as a work around, but it prevents us from running in fine grained mode which we would prefer for some job. Looking at the code for MesosSchedulerBackend, it it has an empty implementation of the reregistered (and disconnected) methods, which I believe would be called when the master changes: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L202 http://mesos.apache.org/documentation/latest/app-framework-development-guide/ Are there any plans to implement master reregistration in the Spark framework, or does anyone have any suggested workarounds for long running jobs to deal with the mesos master changing? (Or is there something we are doing wrong?) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Framework-handling-of-Mesos-master-change-tp21107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to recovery application running records when I restart Spark master?
Hi all, Due to some reasons, I restarted Spark master node. Before I restart it, there were some application running records at the bottom of the master web page. But they are gone after I restart the master node. The records include application name, running time, status, and so on. I am sure you know what I am talking about. My question is: 1) how can I recovery those records when I restart my Spark master node. 2) If I cannot recovery it, where should I go to look for them? Actually, I am caring about the running time of finished applications. Thank you for your help, and hope every body is doing great! Chong
Re: Getting Output From a Cluster
There is no direct way of doing that. If you need a Single file for every batch duration, then you can repartition the data to 1 before saving. Another way would be to use hadoop's copy merge command/api(available from 2.0 versions) On 13 Jan 2015 01:08, Su She suhsheka...@gmail.com wrote: Hello Everyone, Quick followup, is there any way I can append output to one file rather then create a new directory/file every X milliseconds? Thanks! Suhas Shekar University of California, Los Angeles B.A. Economics, Specialization in Computing 2014 On Thu, Jan 8, 2015 at 11:41 PM, Su She suhsheka...@gmail.com wrote: 1) Thank you everyone for the help once again...the support here is really amazing and I hope to contribute soon! 2) The solution I actually ended up using was from this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E in case the thread ever goes down, the soln provided by Matei: plans.saveAsHadoopFiles(hdfs://localhost:8020/user/hue/output/completed,csv, String.class, String.class, (Class) TextOutputFormat.class); I had browsed a lot of similar threads that did not have answers, but found this one from quite some time ago, so apologize for posting a question that had been answered before. 3) Akhil, I was specifying the format as txt, but it was not compatible Thanks for the help! On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: saveAsHadoopFiles requires you to specify the output format which i believe you are not specifying anywhere and hence the program crashes. You could try something like this: Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) SequenceFileOutputFormat.class; 46 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl, /output-location,Text.class, Text.class, outputFormatClass); Thanks Best Regards On Fri, Jan 9, 2015 at 10:22 AM, Su She suhsheka...@gmail.com wrote: Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I call print on the Dstream it works? If I had to do foreachRDD to saveAsHadoopFile, then why is it working for print? Also, if I am doing foreachRDD, do I need connections, or can I simply put the saveAsHadoopFiles inside the foreachRDD function? Thanks Yana for the help! I will play around with foreachRDD and convey my results. On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: are you calling the saveAsText files on the DStream --looks like it? Look at the section called Design Patterns for using foreachRDD in the link you sent -- you want to do dstream.foreachRDD(rdd = rdd.saveAs) On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, Thanks in advance for the help! I successfully got my Kafka/Spark WordCount app to print locally. However, I want to run it on a cluster, which means that I will have to save it to HDFS if I want to be able to read the output. I am running Spark 1.1.0, which means according to this document: https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html I should be able to use commands such as saveAsText/HadoopFiles. 1) When I try saveAsTextFiles it says: cannot find symbol [ERROR] symbol : method saveAsTextFiles(java.lang.String,java.lang.String) [ERROR] location: class org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer This makes some sense as saveAsTextFiles is not included here: http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html 2) When I try saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount, txt) it builds, but when I try running it it throws this exception: Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079) at org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161)
Re: Getting Output From a Cluster
Okay, thanks Akhil! Suhas Shekar University of California, Los Angeles B.A. Economics, Specialization in Computing 2014 On Mon, Jan 12, 2015 at 1:24 PM, Akhil Das ak...@sigmoidanalytics.com wrote: There is no direct way of doing that. If you need a Single file for every batch duration, then you can repartition the data to 1 before saving. Another way would be to use hadoop's copy merge command/api(available from 2.0 versions) On 13 Jan 2015 01:08, Su She suhsheka...@gmail.com wrote: Hello Everyone, Quick followup, is there any way I can append output to one file rather then create a new directory/file every X milliseconds? Thanks! Suhas Shekar University of California, Los Angeles B.A. Economics, Specialization in Computing 2014 On Thu, Jan 8, 2015 at 11:41 PM, Su She suhsheka...@gmail.com wrote: 1) Thank you everyone for the help once again...the support here is really amazing and I hope to contribute soon! 2) The solution I actually ended up using was from this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E in case the thread ever goes down, the soln provided by Matei: plans.saveAsHadoopFiles(hdfs://localhost:8020/user/hue/output/completed,csv, String.class, String.class, (Class) TextOutputFormat.class); I had browsed a lot of similar threads that did not have answers, but found this one from quite some time ago, so apologize for posting a question that had been answered before. 3) Akhil, I was specifying the format as txt, but it was not compatible Thanks for the help! On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: saveAsHadoopFiles requires you to specify the output format which i believe you are not specifying anywhere and hence the program crashes. You could try something like this: Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) SequenceFileOutputFormat.class; 46 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl, /output-location,Text.class, Text.class, outputFormatClass); Thanks Best Regards On Fri, Jan 9, 2015 at 10:22 AM, Su She suhsheka...@gmail.com wrote: Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I call print on the Dstream it works? If I had to do foreachRDD to saveAsHadoopFile, then why is it working for print? Also, if I am doing foreachRDD, do I need connections, or can I simply put the saveAsHadoopFiles inside the foreachRDD function? Thanks Yana for the help! I will play around with foreachRDD and convey my results. On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: are you calling the saveAsText files on the DStream --looks like it? Look at the section called Design Patterns for using foreachRDD in the link you sent -- you want to do dstream.foreachRDD(rdd = rdd.saveAs) On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, Thanks in advance for the help! I successfully got my Kafka/Spark WordCount app to print locally. However, I want to run it on a cluster, which means that I will have to save it to HDFS if I want to be able to read the output. I am running Spark 1.1.0, which means according to this document: https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html I should be able to use commands such as saveAsText/HadoopFiles. 1) When I try saveAsTextFiles it says: cannot find symbol [ERROR] symbol : method saveAsTextFiles(java.lang.String,java.lang.String) [ERROR] location: class org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer This makes some sense as saveAsTextFiles is not included here: http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html 2) When I try saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount, txt) it builds, but when I try running it it throws this exception: Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079) at org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632) at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at
Re: How to recovery application running records when I restart Spark master?
http://spark.apache.org/docs/latest/monitoring.html http://spark.apache.org/docs/latest/configuration.html#spark-ui spark.eventLog.enabled On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote: Is there any body can help me with this? Thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Framework handling of Mesos master change
Hi Ethan, How are you specifying the master to spark? Able to recover from master failover is already handled by the underlying Mesos scheduler, but you have to use zookeeper instead of directly passing in the master uris. Tim On Mon, Jan 12, 2015 at 12:44 PM, Ethan Wolf ethan.w...@alum.mit.edu wrote: We are running Spark and Spark Streaming on Mesos (with multiple masters for HA). At launch, our Spark jobs successfully look up the current Mesos master from zookeeper and spawn tasks. However, when the Mesos master changes while the spark job is executing, the spark driver seems to interact with the old Mesos master, and therefore fails to launch any new tasks. We are running long running Spark streaming jobs, so we have temporarily switched to coarse grained as a work around, but it prevents us from running in fine grained mode which we would prefer for some job. Looking at the code for MesosSchedulerBackend, it it has an empty implementation of the reregistered (and disconnected) methods, which I believe would be called when the master changes: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L202 http://mesos.apache.org/documentation/latest/app-framework-development-guide/ Are there any plans to implement master reregistration in the Spark framework, or does anyone have any suggested workarounds for long running jobs to deal with the mesos master changing? (Or is there something we are doing wrong?) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Framework-handling-of-Mesos-master-change-tp21107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to recovery application running records when I restart Spark master?
Thank you, Cody! Actually, I have enabled this option, and I saved logs into Hadoop file system. The problem is, how can I get the duration of an application? The attached file is the log I copied from HDFS. On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest/monitoring.html http://spark.apache.org/docs/latest/configuration.html#spark-ui spark.eventLog.enabled On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote: Is there any body can help me with this? Thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org EVENT_LOG_1 Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to recovery application running records when I restart Spark master?
Thank you, Cody! Actually, I have enabled this option, and I saved logs into Hadoop file system. The problem is, how can I get the duration of an application? The attached file is the log I copied from HDFS. On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest/monitoring.html http://spark.apache.org/docs/latest/configuration.html#spark-ui spark.eventLog.enabled On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote: Is there any body can help me with this? Thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org EVENT_LOG_1 Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Discrepancy in PCA values
Could you compare V directly and tell us more about the difference you saw? The column of V should be the same subject to signs. For example, the first column of V could be either [0.8, -0.6, 0.0] or [-0.8, 0.6, 0.0]. -Xiangrui On Sat, Jan 10, 2015 at 8:08 PM, Upul Bandara upulband...@gmail.com wrote: Hi Xiangrui, Thanks a lot for you answer. So I fixed my Julia code, also calculated PCA using R as well. R Code: - data - read.csv('/home/upul/Desktop/iris.csv'); X - data[,1:4] pca - prcomp(X, center = TRUE, scale=FALSE) transformed - predict(pca, newdata = X) Julia Code (Fixed) -- data = readcsv(/home/upul/temp/iris.csv); X = data[:,1:end-1]; meanX = mean(X,1); m,n = size(X); X = X - repmat(x, m,1); u,s,v = svd(X); transformed = X*v; Now PCA calculated using Julia and R is identical, but still I can see a small difference between PCA values given by Spark and other two. Thanks, Upul On Sat, Jan 10, 2015 at 11:17 AM, Xiangrui Meng men...@gmail.com wrote: You need to subtract mean values to obtain the covariance matrix (http://en.wikipedia.org/wiki/Covariance_matrix). On Fri, Jan 9, 2015 at 6:41 PM, Upul Bandara upulband...@gmail.com wrote: Hi Xiangrui, Thanks for the reply. Julia code is also using the covariance matrix: (1/n)*X'*X ; Thanks, Upul On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng men...@gmail.com wrote: The Julia code is computing the SVD of the Gram matrix. PCA should be applied to the covariance matrix. -Xiangrui On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara upulband...@gmail.com wrote: Hi All, I tried to do PCA for the Iris dataset [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html]. Also, PCA was calculated in Julia using following method: Sigma = (1/numRow(X))*X'*X ; [U, S, V] = svd(Sigma); Ureduced = U(:, 1:k); Z = X*Ureduced; However, I'm seeing a little difference between values given by MLLib and the method shown above . Does anyone have any idea about this difference? Additionally, I have attached two visualizations, related to two approaches. Thanks, Upul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How does unmanaged memory work with the executor memory limits?
Greetings! My executors apparently are being terminated because they are running beyond physical memory limits according to the yarn-hadoop-nodemanager logs on the worker nodes (/mnt/var/log/hadoop on AWS EMR). I'm setting the driver-memory to 8G.However, looking at stdout in userlogs, I can see GC going on, but the lines looklike 6G - 5G(7.2G), 0.45secs, so the GC seems to think that the process is usingabout 6G of space, not 8G of space. However, ps aux shows an RSS hovering just below 8G. The process does a mapParitionsWithIndex, and the process uses compressionwhich (I believe) calls into the native zlib library (the overall purpose is to convert each partition into a matlab file). Could it be that the Yarn container is counting both the memory used by the JVM proper and memory used by zlib, but that the GC only sees the internal memory. So the GC keeps the memory usage reasonable, e.g., 6G in an 8G container, but then zlib grabs some memory, and the YARN container then terminates the task? If so, is there anything I can do so that I tell YARN to watch for a largermemory limit than I tell the JVM to use for it's memory? Thanks! Sincerely, Mike
Re: How does unmanaged memory work with the executor memory limits?
Short answer: yes. Take a look at: http://spark.apache.org/docs/latest/running-on-yarn.html Look for memoryOverhead. On Mon, Jan 12, 2015 at 2:06 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! My executors apparently are being terminated because they are running beyond physical memory limits according to the yarn-hadoop-nodemanager logs on the worker nodes (/mnt/var/log/hadoop on AWS EMR). I'm setting the driver-memory to 8G. However, looking at stdout in userlogs, I can see GC going on, but the lines look like 6G - 5G(7.2G), 0.45secs, so the GC seems to think that the process is using about 6G of space, not 8G of space. However, ps aux shows an RSS hovering just below 8G. The process does a mapParitionsWithIndex, and the process uses compression which (I believe) calls into the native zlib library (the overall purpose is to convert each partition into a matlab file). Could it be that the Yarn container is counting both the memory used by the JVM proper and memory used by zlib, but that the GC only sees the internal memory. So the GC keeps the memory usage reasonable, e.g., 6G in an 8G container, but then zlib grabs some memory, and the YARN container then terminates the task? If so, is there anything I can do so that I tell YARN to watch for a larger memory limit than I tell the JVM to use for it's memory? Thanks! Sincerely, Mike -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to recovery application running records when I restart Spark master?
Sorry, slightly misunderstood the question. I'm not sure if there's a way to make the master UI read old log files after a restart, but the log files themselves are human readable text. If you just want application duration, the start and stop are timestamped, look for lines like this in EVENT_LOG_1: {Event:SparkListenerApplicationStart,App Name:cassandra-example-broadcast-join,Timestamp:1415763986601,User:cody} ... {Event:SparkListenerApplicationEnd,Timestamp:1415763999790} On Mon, Jan 12, 2015 at 3:56 PM, Chong Tang ct...@virginia.edu wrote: Thank you, Cody! Actually, I have enabled this option, and I saved logs into Hadoop file system. The problem is, how can I get the duration of an application? The attached file is the log I copied from HDFS. On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest/monitoring.html http://spark.apache.org/docs/latest/configuration.html#spark-ui spark.eventLog.enabled On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote: Is there any body can help me with this? Thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
including the spark-mllib in build.sbt
Hi, I am trying to build my own scala project using sbt. The project is dependent on both spark-score and spark-mllib. I included the following two dependencies in my build.sbt file libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1 libraryDependencies += org.apache.spark %% spark-core % 1.1.1 However, when I run the package command in sbt, I got an error message indicating that object mllib is not a member of package org.apache.spark. Did I do anything wrong? Thanks, Jianguo
How Spark Calculate partition size automatically
Hi, When I am running a job, that is loading the data from Cassandra, Spark has created almost 9million partitions. How spark decide the partition count? I have read from one of the presentation that it is good to have 1000 to 10,000 partitions. Regards Raj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-Calculate-partition-size-automatically-tp21109.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: calculating the mean of SparseVector RDD
No, colStats() computes all summary statistics in one pass and store the values. It is not lazy. On Mon, Jan 12, 2015 at 4:42 AM, Rok Roskar rokros...@gmail.com wrote: This was without using Kryo -- if I use kryo, I got errors about buffer overflows (see above): com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5, required: 8 Just calling colStats doesn't actually compute those statistics, does it? It looks like the computation is only carried out once you call the .mean() method. On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng men...@gmail.com wrote: colStats() computes the mean values along with several other summary statistics, which makes it slower. How is the performance if you don't use kryo? -Xiangrui On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote: thanks for the suggestion -- however, looks like this is even slower. With the small data set I'm using, my aggregate function takes ~ 9 seconds and the colStats.mean() takes ~ 1 minute. However, I can't get it to run with the Kyro serializer -- I get the error: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5, required: 8 is there an easy/obvious fix? On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote: There is some serialization overhead. You can try https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107 . -Xiangrui On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote: I have an RDD of SparseVectors and I'd like to calculate the means returning a dense vector. I've tried doing this with the following (using pyspark, spark v1.2.0): def aggregate_partition_values(vec1, vec2) : vec1[vec2.indices] += vec2.values return vec1 def aggregate_combined_vectors(vec1, vec2) : if all(vec1 == vec2) : # then the vector came from only one partition return vec1 else: return vec1 + vec2 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values, aggregate_combined_vectors) means = means / nvals This turns out to be really slow -- and doesn't seem to depend on how many vectors there are so there seems to be some overhead somewhere that I'm not understanding. Is there a better way of doing this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: including the spark-mllib in build.sbt
I don't know the root cause. Could you try including only libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1 It should be sufficient because mllib depends on core. -Xiangrui On Mon, Jan 12, 2015 at 2:27 PM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I am trying to build my own scala project using sbt. The project is dependent on both spark-score and spark-mllib. I included the following two dependencies in my build.sbt file libraryDependencies += org.apache.spark %% spark-mllib % 1.1.1 libraryDependencies += org.apache.spark %% spark-core % 1.1.1 However, when I run the package command in sbt, I got an error message indicating that object mllib is not a member of package org.apache.spark. Did I do anything wrong? Thanks, Jianguo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OOM exception during row deserialization
Does anybody have insight on this? Thanks. On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a join step. Basically, i have a RDD of rows, that i am joining with another RDD of tuples. Some of the tasks succeed but a fair number failed with OOM exception with stack below. The stack belongs to the 'reducer' that is reading shuffle output from the 'mapper'. My question is what's the object being deserialized here - just a portion of an RDD or the whole RDD partition assigned to current reducer? The rows in the RDD could be large, but definitely not something that would run to 100s of MBs in size, and thus run out of memory. Also, is there a way to determine size of the object being deserialized that results in the error (either by looking at some staging hdfs dir or logs)? java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded} java.util.Arrays.copyOf(Arrays.java:2367) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535) java.lang.StringBuilder.append(StringBuilder.java:204) java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863) java.io.ObjectInputStream.readString(ObjectInputStream.java:1636) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) java.util.ArrayList.readObject(ArrayList.java:771) sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) Thanks, pala
Re: Broadcast joins on RDD
First, you should collect().toMap() the small RDD, then you should use broadcast followed by a map to do a map-side join http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf (slide 10 has an example). Spark SQL also does it by default for tables that are smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). On Mon, Jan 12, 2015 at 3:15 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, How do i do broadcast/map join on RDDs? I have a large RDD that i want to inner join with a small RDD. Instead of having the large RDD repartitioned and shuffled for join, i would rather send a copy of a small RDD to each task, and then perform the join locally. How would i specify this in Spark code? I didn't find much documentation online. I attempted to create a broadcast variable out of the small RDD and then access that in the join operator: largeRdd.join(smallRddBroadCastVar.value) but that didn't work as expected ( I found that all rows with same key were on same task) I am using Spark version 1.0.1 Thanks, pala
Re: Manually trigger RDD map function without action
Hey Kevin, I assume you want to trigger the map() for a side effect (since you don't care about the result). To Cody's point, you can use foreach() *instead* of map(). So instead of e.g. x.map(a = foo(a)).foreach(a = a), you'd run x.foreach(a = foo(a)). Best, -Sven On Mon, Jan 12, 2015 at 5:13 PM, Kevin Jung itsjb.j...@samsung.com wrote: Cody said If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing? but I can not see it from this thread. Anyway, I performed small benchmark to test what function is the most efficient way. And a winner is foreach(a = a) according to everyone's expectations. Collect can cause OOM from driver and count is very slower than the others. Thanks all. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- http://sites.google.com/site/krasser/?utm_source=sig
quickly counting the number of rows in a partition?
Is there a way to compute the total number of records in each RDD partition? So say I had 4 partitions.. I’d want to have partition 0: 100 records partition 1: 104 records partition 2: 90 records partition 3: 140 records Kevin -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: Is there any Spark implementation for Item-based Collaborative Filtering?
Also a ready-to-use server with Spark MLlib: http://docs.prediction.io/recommendation/quickstart/ The source code is here: https://github.com/PredictionIO/PredictionIO/tree/develop/templates/scala-parallel-recommendation Simon On Sun, Nov 30, 2014 at 12:17 PM, Pat Ferrel p...@occamsmachete.com wrote: Actually the spark-itemsimilarity job and related code in the Spark module of Mahout creates all-pairs similarity too. It’s designed to use with a search engine, which provides the query part of the recommender. Integrate the two and you have a near realtime scalable item-based/cooccurrence collaborative filtering type recommender. On Nov 30, 2014, at 12:09 PM, Sean Owen so...@cloudera.com wrote: There is an implementation of all-pairs similarity. Have a look at the DIMSUM implementation in RowMatrix. It is an element of what you would need for such a recommender, but not the whole thing. You can also do the model-building part of an ALS-based recommender with ALS in MLlib. So, no not directly, but there are related pieces. On Sun, Nov 30, 2014 at 5:36 PM, shahab shahab.mok...@gmail.com wrote: Hi, I just wonder if there is any implementation for Item-based Collaborative Filtering in Spark? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [mllib] GradientDescent requires huge memory for storing weight vector
I guess you're not using too many features (e.g. 10m), just that hashing the index makes it look that way, is that correct? If so, the simple dictionary that maps your feature index - rank can be broadcast and used everywhere, so you can pass mllib just the feature's rank as its index. Reza On Mon, Jan 12, 2015 at 4:26 PM, Tianshuo Deng td...@twitter.com.invalid wrote: Hi, Currently in GradientDescent.scala, weights is constructed as a dense vector: initialWeights = Vectors.dense(new Array[Double](numFeatures)) And the numFeatures is determined in the loadLibSVMFile as the max index of features. But in the case of using hash function to compute feature index, it results in a huge dense vector being generated taking lots of memory space. Any suggestions?
[mllib] GradientDescent requires huge memory for storing weight vector
Hi, Currently in GradientDescent.scala, weights is constructed as a dense vector: initialWeights = Vectors.dense(new Array[Double](numFeatures)) And the numFeatures is determined in the loadLibSVMFile as the max index of features. But in the case of using hash function to compute feature index, it results in a huge dense vector being generated taking lots of memory space. Any suggestions?
Re: train many decision tress with a single spark job
Sean, Thanks for the response. Is there some subtle difference between one model partitioned by N users or N models per each 1 user? I think I'm missing something with your question. Looping through the RDD filtering one user at a time would certainly give me the response that I am hoping for (i.e a map of user = decisiontree), however, that seems like it would yield poor performance? The userIDs are not integers, so I either need to iterator through some in-memory array of them (could be quite large) or have some distributed lookup table. Neither seem great. I tried the random split thing. I wonder if I did something wrong there, but some of the splits got RDDs with 0 tuples and some got RDDs with 1 tuple. I guess that's to be expected with some random distribution? However, that won't work for me since it breaks the one tree per user thing. I guess I could randomly distribute user IDs and then do the scan everything and filter step... How bad of an idea is it to do: data.groupByKey.map( kvp = { val (key, data) = kvp val tree = DecisionTree.train( sc.makeRDD(data), ... ) (key, tree) }) Is there a way I could tell spark not to distribute the RDD created by sc.makeRDD(data) but just to deal with it on whatever spark worker is handling kvp? Does that question make sense? Thanks! Josh On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen so...@cloudera.com wrote: You just mean you want to divide the data set into N subsets, and do that dividing by user, not make one model per user right? I suppose you could filter the source RDD N times, and build a model for each resulting subset. This can be parallelized on the driver. For example let's say you divide into N subsets depending on the value of the user ID modulo N: val N = ... (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N == d), ...)) data should be cache()-ed here of course. However it may be faster and more principled to take random subsets directly: data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset = DecisionTree.train(subset, ...)) On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote: I've got a data set of activity by user. For each user, I'd like to train a decision tree model. I currently have the feature creation step implemented in Spark and would naturally like to use mllib's decision tree model. However, it looks like the decision tree model expects the whole RDD and will train a single tree. Can I split the RDD by user (i.e. groupByKey) and then call the DecisionTree.trainClassifer in a reduce() or aggregate function to create a RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset instead of an RDD? Call sc.parallelize on the Iterable values in a groupBy to create a mini-RDD? Has anyone else tried something like this with success? Thanks!
Re: Manually trigger RDD map function without action
Cody said If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing? but I can not see it from this thread. Anyway, I performed small benchmark to test what function is the most efficient way. And a winner is foreach(a = a) according to everyone's expectations. Collect can cause OOM from driver and count is very slower than the others. Thanks all. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: quickly counting the number of rows in a partition?
Yes, using mapPartitionsWithIndex, e.g. in PySpark: sc.parallelize(xrange(0,1000), 4).mapPartitionsWithIndex(lambda idx,iter: ((idx, len(list(iter))),)).collect() [(0, 250), (1, 250), (2, 250), (3, 250)] (This is not the most efficient way to get the length of an iterator, but you get the idea...) Best, -Sven On Mon, Jan 12, 2015 at 6:54 PM, Kevin Burton bur...@spinn3r.com wrote: Is there a way to compute the total number of records in each RDD partition? So say I had 4 partitions.. I’d want to have partition 0: 100 records partition 1: 104 records partition 2: 90 records partition 3: 140 records Kevin -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com -- http://sites.google.com/site/krasser/?utm_source=sig
Re: train many decision tress with a single spark job
You are right... my code example doesn't work :) I actually do want a decision tree per user. So, for 1 million users, I want 1 million trees. We're training against time series data, so there are still quite a few data points per users. My previous message where I mentioned RDDs with no length was, I think, a result of the way the random partitioning worked (I was partitioning into N groups where N was the number of users... total). Given this, I'm thinking the mlllib is not designed for this particular case? It appears optimized for training across large datasets. I was just hoping to leverage it since creating my feature sets for the users was already in Spark. On Mon, Jan 12, 2015 at 5:05 PM, Sean Owen so...@cloudera.com wrote: A model partitioned by users? I mean that if you have a million users surely you don't mean to build a million models. There would be little data per user right? Sounds like you have 0 sometimes. You would typically be generalizing across users not examining them in isolation. Models are built on thousands or millions of data points. I assumed you were subsetting for cross validation in which case we are talking about making more like say 10 models. You usually take random subsets. But it might be as fine to subset as a function of a user ID if you like. Or maybe you do have some reason for segregating users and modeling them differently (e.g. different geographies or something). Your code doesn't work as is since you are using RDDs inside RDDs. But I am also not sure you should do what it looks like you are trying to do. On Jan 13, 2015 12:32 AM, Josh Buffum jbuf...@gmail.com wrote: Sean, Thanks for the response. Is there some subtle difference between one model partitioned by N users or N models per each 1 user? I think I'm missing something with your question. Looping through the RDD filtering one user at a time would certainly give me the response that I am hoping for (i.e a map of user = decisiontree), however, that seems like it would yield poor performance? The userIDs are not integers, so I either need to iterator through some in-memory array of them (could be quite large) or have some distributed lookup table. Neither seem great. I tried the random split thing. I wonder if I did something wrong there, but some of the splits got RDDs with 0 tuples and some got RDDs with 1 tuple. I guess that's to be expected with some random distribution? However, that won't work for me since it breaks the one tree per user thing. I guess I could randomly distribute user IDs and then do the scan everything and filter step... How bad of an idea is it to do: data.groupByKey.map( kvp = { val (key, data) = kvp val tree = DecisionTree.train( sc.makeRDD(data), ... ) (key, tree) }) Is there a way I could tell spark not to distribute the RDD created by sc.makeRDD(data) but just to deal with it on whatever spark worker is handling kvp? Does that question make sense? Thanks! Josh On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen so...@cloudera.com wrote: You just mean you want to divide the data set into N subsets, and do that dividing by user, not make one model per user right? I suppose you could filter the source RDD N times, and build a model for each resulting subset. This can be parallelized on the driver. For example let's say you divide into N subsets depending on the value of the user ID modulo N: val N = ... (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N == d), ...)) data should be cache()-ed here of course. However it may be faster and more principled to take random subsets directly: data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset = DecisionTree.train(subset, ...)) On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote: I've got a data set of activity by user. For each user, I'd like to train a decision tree model. I currently have the feature creation step implemented in Spark and would naturally like to use mllib's decision tree model. However, it looks like the decision tree model expects the whole RDD and will train a single tree. Can I split the RDD by user (i.e. groupByKey) and then call the DecisionTree.trainClassifer in a reduce() or aggregate function to create a RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset instead of an RDD? Call sc.parallelize on the Iterable values in a groupBy to create a mini-RDD? Has anyone else tried something like this with success? Thanks!
Re: OOM exception during row deserialization
Hey Pala, I also find it very hard to get to the bottom of memory issues such as this one based on what's in the logs (so if you come up with some findings, then please share here). In the interim, here are a few things you can try: - Provision more memory per executor. While in theory (and depending on your storage level) data can be spilled to disk or recomputed from lineage if it doesn't fit into memory, I have experienced a lot of problems with failing jobs when underprovisioning memory. - Experiment with both the memory and shuffle fractions. - Repartition your data so that you get smaller tasks. As far as object size goes, since your issue occurs on deserialization, you could compute the size on the map side and roll it up into a histogram. Hope this helps! -Sven On Mon, Jan 12, 2015 at 2:48 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Does anybody have insight on this? Thanks. On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a join step. Basically, i have a RDD of rows, that i am joining with another RDD of tuples. Some of the tasks succeed but a fair number failed with OOM exception with stack below. The stack belongs to the 'reducer' that is reading shuffle output from the 'mapper'. My question is what's the object being deserialized here - just a portion of an RDD or the whole RDD partition assigned to current reducer? The rows in the RDD could be large, but definitely not something that would run to 100s of MBs in size, and thus run out of memory. Also, is there a way to determine size of the object being deserialized that results in the error (either by looking at some staging hdfs dir or logs)? java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded} java.util.Arrays.copyOf(Arrays.java:2367) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535) java.lang.StringBuilder.append(StringBuilder.java:204) java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863) java.io.ObjectInputStream.readString(ObjectInputStream.java:1636) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) java.util.ArrayList.readObject(ArrayList.java:771) sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) Thanks, pala -- http://sites.google.com/site/krasser/?utm_source=sig
Broadcast joins on RDD
Hi, How do i do broadcast/map join on RDDs? I have a large RDD that i want to inner join with a small RDD. Instead of having the large RDD repartitioned and shuffled for join, i would rather send a copy of a small RDD to each task, and then perform the join locally. How would i specify this in Spark code? I didn't find much documentation online. I attempted to create a broadcast variable out of the small RDD and then access that in the join operator: largeRdd.join(smallRddBroadCastVar.value) but that didn't work as expected ( I found that all rows with same key were on same task) I am using Spark version 1.0.1 Thanks, pala
Re: train many decision tress with a single spark job
A model partitioned by users? I mean that if you have a million users surely you don't mean to build a million models. There would be little data per user right? Sounds like you have 0 sometimes. You would typically be generalizing across users not examining them in isolation. Models are built on thousands or millions of data points. I assumed you were subsetting for cross validation in which case we are talking about making more like say 10 models. You usually take random subsets. But it might be as fine to subset as a function of a user ID if you like. Or maybe you do have some reason for segregating users and modeling them differently (e.g. different geographies or something). Your code doesn't work as is since you are using RDDs inside RDDs. But I am also not sure you should do what it looks like you are trying to do. On Jan 13, 2015 12:32 AM, Josh Buffum jbuf...@gmail.com wrote: Sean, Thanks for the response. Is there some subtle difference between one model partitioned by N users or N models per each 1 user? I think I'm missing something with your question. Looping through the RDD filtering one user at a time would certainly give me the response that I am hoping for (i.e a map of user = decisiontree), however, that seems like it would yield poor performance? The userIDs are not integers, so I either need to iterator through some in-memory array of them (could be quite large) or have some distributed lookup table. Neither seem great. I tried the random split thing. I wonder if I did something wrong there, but some of the splits got RDDs with 0 tuples and some got RDDs with 1 tuple. I guess that's to be expected with some random distribution? However, that won't work for me since it breaks the one tree per user thing. I guess I could randomly distribute user IDs and then do the scan everything and filter step... How bad of an idea is it to do: data.groupByKey.map( kvp = { val (key, data) = kvp val tree = DecisionTree.train( sc.makeRDD(data), ... ) (key, tree) }) Is there a way I could tell spark not to distribute the RDD created by sc.makeRDD(data) but just to deal with it on whatever spark worker is handling kvp? Does that question make sense? Thanks! Josh On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen so...@cloudera.com wrote: You just mean you want to divide the data set into N subsets, and do that dividing by user, not make one model per user right? I suppose you could filter the source RDD N times, and build a model for each resulting subset. This can be parallelized on the driver. For example let's say you divide into N subsets depending on the value of the user ID modulo N: val N = ... (0 until N).par.map(d = DecisionTree.train(data.filter(_.userID % N == d), ...)) data should be cache()-ed here of course. However it may be faster and more principled to take random subsets directly: data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset = DecisionTree.train(subset, ...)) On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum jbuf...@gmail.com wrote: I've got a data set of activity by user. For each user, I'd like to train a decision tree model. I currently have the feature creation step implemented in Spark and would naturally like to use mllib's decision tree model. However, it looks like the decision tree model expects the whole RDD and will train a single tree. Can I split the RDD by user (i.e. groupByKey) and then call the DecisionTree.trainClassifer in a reduce() or aggregate function to create a RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset instead of an RDD? Call sc.parallelize on the Iterable values in a groupBy to create a mini-RDD? Has anyone else tried something like this with success? Thanks!
Re: Trouble with large Yarn job
Anders, This could be related to this open ticket: https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also fixed that for us as a stopgap. Best, -Sven On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg arp...@spotify.com wrote: Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've actually been able to solve the problem finally, and it seems to be an issue with too many partitions. The repartitioning I tried initially did so after the union, and then it's too late. By repartitioning as early as possible, and significantly reducing number of partitions (going from 100,000+ to ~6,000 partitions), the job succeeds and no more Error communicating with MapOutputTracker issues. Seems like an issue with handling too many partitions and executors as the same time. Would be awesome with an auto-repartition function, that checks sizes of existing partitions and compares with the HDFS block size. If too small (or too large), it would repartition to partition sizes similar to the block size... Hope this help others with similar issues. Best, Anders On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, Have you checked your NodeManager logs to make sure YARN isn't killing executors for exceeding memory limits? -Sandy On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote: Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)). reduceByKey(_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders -- http://sites.google.com/site/krasser/?utm_source=sig
Running Spark application from command line
I have a Spark application that was assembled using sbt 0.13.7, Scala 2.11, and Spark 1.2.0. In build.sbt, I am running on Mac OSX Yosemite. I use provided for the Spark dependencies. I can run the application fine within sbt. I run into problems when I try to run it from the command line. Here is the command I use: ADD_JARS=analysis/target/scala-2.11/dtex-analysis_2.11-0.1.jar scala -cp /Applications/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:analysis/target/scala-2.11/dtex-analysis_2.11-0.1.jar com.dtex.analysis.transform.GenUserSummaryView ... I get the following error messages below. Please advise what I can do to resolve this issue. Thanks! arun 15/01/12 22:47:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/01/12 22:47:18 WARN BlockManager: Putting block broadcast_0 failed java.lang.NoSuchMethodError: scala.collection.immutable.$colon$colon.hd$1()Ljava/lang/Object; at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:84) at org.apache.spark.util.collection.SizeTracker$class.resetSamples(SizeTracker.scala:61) at org.apache.spark.util.collection.SizeTrackingVector.resetSamples(SizeTrackingVector.scala:25) at org.apache.spark.util.collection.SizeTracker$class.$init$(SizeTracker.scala:51) at org.apache.spark.util.collection.SizeTrackingVector.init(SizeTrackingVector.scala:25) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695) at org.apache.spark.SparkContext.textFile(SparkContext.scala:540) at com.dtex.analysis.transform.TransformUtils$anonfun$2.apply(TransformUtils.scala:97) at com.dtex.analysis.transform.TransformUtils$anonfun$2.apply(TransformUtils.scala:97) at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.dtex.analysis.transform.TransformUtils$.generateUserSummaryData(TransformUtils.scala:97) at com.dtex.analysis.transform.GenUserSummaryView$.main(GenUserSummaryView.scala:77) at com.dtex.analysis.transform.GenUserSummaryView.main(GenUserSummaryView.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at scala.reflect.internal.util.ScalaClassLoader$anonfun$run$1.apply(ScalaClassLoader.scala:70) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:101) at scala.reflect.internal.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:70) at scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:101) at scala.tools.nsc.CommonRunner$class.run(ObjectRunner.scala:22) at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:39) at scala.tools.nsc.CommonRunner$class.runAndCatch(ObjectRunner.scala:29) at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:39) at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:65) at scala.tools.nsc.MainGenericRunner.run$1(MainGenericRunner.scala:87) at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:98) at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:103) at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)
Re: Problem with building spark-1.2.0
Yes, this proxy problem is resolved. *how your build refers tohttps://github.com/ScrapCodes/sbt-pom-reader.git https://github.com/ScrapCodes/sbt-pom-reader.git I don't see thisrepo the project code base.* I manually downloaded the sbt-pom-reader directory and moved into .sbt/0.13/staging/*/ directory. But, I face the following: karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION = 2.3.0 sbt/sbt assembly Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. [info] Loading project definition from /home/karthik/spark-1.2.0/project/project [info] Loading project definition from /home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader/project [warn] Multiple resolvers having different access mechanism configured with same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project resolvers (`resolvers`) or rename publishing resolver (`publishTo`). [info] Updating {file:/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader/project/}sbt-pom-reader-build... [info] Resolving com.typesafe.sbt#sbt-ghpages;0.5.2 ... Could you please tell me how do I build stand-alone spark-1.2.0 with sbt correctly? On Mon, Jan 12, 2015 at 4:21 PM, Sean Owen so...@cloudera.com wrote: The problem is there in the logs. When it went to clone some code, something went wrong with the proxy: Received HTTP code 407 from proxy after CONNECT Probably you have an HTTP proxy and you have not authenticated. It's specific to your environment. Although it's unrelated, I'm curious how your build refers to https://github.com/ScrapCodes/sbt-pom-reader.git I don't see this repo the project code base. On Mon, Jan 12, 2015 at 9:09 AM, Kartheek.R kartheek.m...@gmail.com wrote: Hi, This is what I am trying to do: karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. [info] Loading project definition from /home/karthik/spark-1.2.0/project/project Cloning into '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'... fatal: unable to access ' https://github.com/ScrapCodes/sbt-pom-reader.git/': Received HTTP code 407 from proxy after CONNECT java.lang.RuntimeException: Nonzero exit code (128): git clone https://github.com/ScrapCodes/sbt-pom-reader.git
Re: How to create an empty RDD with a given type?
Got it, thanks! On Tue, Jan 13, 2015 at 2:00 PM, Justin Yip yipjus...@gmail.com wrote: Xuelin, There is a function called emtpyRDD under spark context http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext which serves this purpose. Justin On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, I'd like to create a transform function, that convert RDD[String] to RDD[Int] Occasionally, the input RDD could be an empty RDD. I just want to directly create an empty RDD[Int] if the input RDD is empty. And, I don't want to return None as the result. Is there an easy way to do that?
Re: How to create an empty RDD with a given type?
Xuelin, There is a function called emtpyRDD under spark context http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext which serves this purpose. Justin On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, I'd like to create a transform function, that convert RDD[String] to RDD[Int] Occasionally, the input RDD could be an empty RDD. I just want to directly create an empty RDD[Int] if the input RDD is empty. And, I don't want to return None as the result. Is there an easy way to do that?
Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?
How about your scene? do you need use lots of Broadcast? If not, It will be better to focus more on other thing. At this time, there is not more better method than TorrentBroadcast. Though one-by-one, but after one node get the data, it can act as the data source immediately.
Creating RDD from only few columns of a Parquet file
Hi,I am trying to read a parquet file using -val parquetFile = sqlContext.parquetFile(people.parquet) There is no way to specify that I am interested in reading only some columns from disk. For example, If the parquet file has 10 columns and want to read only 3 columns from disk. We have done an experiment - Table1 - Parquet file containing 10 columns Table2 - Parquet file containing only 3 columns which were used in query The time taken by query on table1 and table2 shows huge difference. Query on Table1 takes more than double of time taken on table2 which makes me think that spark is reading all the columns from disk in case of table1 when it needs only 3 columns. How should I make sure that it reads only 3 of 10 columns from disk ? Regards, Ajay
configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0
Hi all, I'm trying to figure out how to set this option: spark.yarn.driver.memoryOverhead on Spark 1.2.0. I found this helpful overview http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to spark-submit. However, when I do that I get this error: Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'. Run with --help for usage help or --verbose for debug output I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark configuration object but I still get Will allocate AM container, with MB memory including 384 MB overhead when launching. I'm running in yarn-cluster mode. Any help or tips would be appreciated. Thanks, David -- David McWhorter Software Engineer Commonwealth Computer Research, Inc. 1422 Sachem Place, Unit #1 Charlottesville, VA 22901 mcwhor...@ccri.com | 434.299.0090x204
Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0
There are two related options: To solve your problem directly try: val conf = new SparkConf().set(spark.yarn.driver.memoryOverhead, 1024) val sc = new SparkContext(conf) And the second, which increases the overall memory available on the driver, as part of your spark-submit script add: --driver-memory 2g Hope this helps! From: David McWhorter mcwhor...@ccri.commailto:mcwhor...@ccri.com Date: Monday, January 12, 2015 at 11:01 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0 Hi all, I'm trying to figure out how to set this option: spark.yarn.driver.memoryOverhead on Spark 1.2.0. I found this helpful overview http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to spark-submit. However, when I do that I get this error: Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'. Run with --help for usage help or --verbose for debug output I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark configuration object but I still get Will allocate AM container, with MB memory including 384 MB overhead when launching. I'm running in yarn-cluster mode. Any help or tips would be appreciated. Thanks, David -- David McWhorter Software Engineer Commonwealth Computer Research, Inc. 1422 Sachem Place, Unit #1 Charlottesville, VA 22901 mcwhor...@ccri.commailto:mcwhor...@ccri.com | 434.299.0090x204 The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Failed to save RDD as text file to local file system
Prannoy I tried this r.saveAsTextFile(home/cloudera/tmp/out1), it return without error. But where does it saved to? The folder “/home/cloudera/tmp/out1” is not cretaed. I also tried the following cd /home/cloudera/tmp/ spark-shell scala val r = sc.parallelize(Array(a, b, c)) scala r.saveAsTextFile(out1) It does not return error. But still there is no “out1” folder created under /home/cloudera/tmp/ I tried to give absolute path but then get an error scala r.saveAsTextFile(/home/cloudera/tmp/out1) org.apache.hadoop.security.AccessControlException: Permission denied: user=cloudera, access=WRITE, inode=/:hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6286) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6268) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6220) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4087) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007) Very frustrated. Please advise. Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: Prannoy [via Apache Spark User List] [mailto:ml-node+s1001560n21093...@n3.nabble.com] Sent: Monday, January 12, 2015 4:18 AM To: Wang, Ningjun (LNG-NPV) Subject: Re: Failed to save RDD as text file to local file system Have you tried simple giving the path where you want to save the file ? For instance in your case just do r.saveAsTextFile(home/cloudera/tmp/out1) Dont use file This will create a folder with name out1. saveAsTextFile always write by making a directory, it does not write data into a single file. Incase you need a single file you can use copyMerge API in FileUtils. FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 , true, conf,null); Now out2 will be a single file containing your data. fs is the configuration of you local file system. Thanks On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] [hidden email]/user/SendEmail.jtp?type=nodenode=21093i=0 wrote: No, do you have any idea? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: firemonk9 [via Apache Spark User List] [mailto:[hidden email]/user/SendEmail.jtp?type=nodenode=21093i=1[hidden email]http://user/SendEmail.jtp?type=nodenode=21068i=0] Sent: Friday, January 09, 2015 2:56 PM To: Wang, Ningjun (LNG-NPV) Subject: Re: Failed to save RDD as text file to local file system Have you found any resolution for this issue ? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html To unsubscribe from Failed to save RDD as text file to local file system, click here.
can I buffer flatMap input at each worker node?
Dear All what i want to do is : as the data is partitioned on many worker nodes I want to be able to process this partition of data as a whole on each partition and then produce my output using flatMap for example. so can I loads all of the input records on one worker node and emitting any output using map function? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-I-buffer-flatMap-input-at-each-worker-node-tp21106.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Trouble with large Yarn job
Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've actually been able to solve the problem finally, and it seems to be an issue with too many partitions. The repartitioning I tried initially did so after the union, and then it's too late. By repartitioning as early as possible, and significantly reducing number of partitions (going from 100,000+ to ~6,000 partitions), the job succeeds and no more Error communicating with MapOutputTracker issues. Seems like an issue with handling too many partitions and executors as the same time. Would be awesome with an auto-repartition function, that checks sizes of existing partitions and compares with the HDFS block size. If too small (or too large), it would repartition to partition sizes similar to the block size... Hope this help others with similar issues. Best, Anders On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, Have you checked your NodeManager logs to make sure YARN isn't killing executors for exceeding memory limits? -Sandy On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg arp...@spotify.com wrote: Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey (_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Re: can I buffer flatMap input at each worker node?
Not sure I understand correctly, but it sounds like you're looking for mapPartitions(). -Sven On Mon, Jan 12, 2015 at 10:17 AM, maherrt mahe...@hotmail.com wrote: Dear All what i want to do is : as the data is partitioned on many worker nodes I want to be able to process this partition of data as a whole on each partition and then produce my output using flatMap for example. so can I loads all of the input records on one worker node and emitting any output using map function? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-I-buffer-flatMap-input-at-each-worker-node-tp21106.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- http://sites.google.com/site/krasser/?utm_source=sig
Pattern Matching / Equals on Case Classes in Spark Not Working
Dear Spark Users, I googled the web for several hours now but I don't find a solution for my problem. So maybe someone from this list can help. I have an RDD of case classes, generated from CSV files with Spark. When I used the distinct operator, there were still duplicates. So I investigated and found out that the equals returns false although the two objects were equal (so were their individual fields as well as toStrings). After googling it I found that the case class equals might break in case the two objects are created by different class loaders. So I implemented my own equals method using mattern matching (code example below). It still didn't work. Some debugging revealed that the problem lies in the pattern matching. Depending on the objects I compare (and maybe the split / classloader they are generated in?) the patternmatching works /doesn't: case class Customer(id: String, age: Option[Int], entryDate: Option[java.util.Date]) { def equals(that: Any): Boolean = that match { case Customer(id, age, entryDate) = { println(Pattern matching worked!) this.id == id this.age == age this.entryDate == entryDate } case _ = false } } //val x: Array[Customer] // ... some spark code to filter original data and collect x scala x(0) Customer(a, Some(5), Some(Fri Sep 23 00:00:00 CEST 1994)) scala x(1) Customer(a, None, None) scala x(2) Customer(a, None, None) scala x(3) Customer(a, None, None) scala x(0) == x(0) // should be true and works Pattern matching works! res0: Boolean = true scala x(0) == x(1) // should be false and works Pattern matching works! res1: Boolean = false scala x(1) == x(2) // should be true, does not work res2: Boolean = false scala x(2) == x(3) // should be true, does not work Pattern matching works! res3: Boolean = true scala x(0) == x(3) // should be false, does not work res4: Boolean = false Why is the pattern matching not working? It seems that there are two kinds of Customers: 0,1 and 2,3 which don't match somehow. Is this related to some classloaders? Is there a way around this other than using instanceof and defining a custom equals operation for every case class I write? Thanks for the help! Frank
Re: Issue writing to Cassandra from Spark
Hi Akhil, Thank you for the pointers. Below is how we are saving data to Cassandra. javaFunctions(rddToSave).writerBuilder(datapipelineKeyspace, datapipelineOutputTable, mapToRow(Sample.class)) The data we are saving at this stage is ~200 million rows. How do we control application threads in spark so that it does not exceed rpc_max_threads? We are running with default value of this property in cassandra.yaml. I have already set these two properties for Spark-Cassandra connector: spark.cassandra.output.batch.size.rows=1 spark.cassandra.output.concurrent.writes=1 Thanks - Ankur On Sun, Jan 11, 2015 at 10:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I see, can you paste the piece of code? Its probably because you are exceeding the number of connection that are specified in the property rpc_max_threads. Make sure you close all the connections properly. Thanks Best Regards On Mon, Jan 12, 2015 at 7:45 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Akhil, thank you for your response. Actually we are first reading from cassandra and then writing back after doing some processing. All the reader stages succeed with no error and many writer stages also succeed but many fail as well. Thanks Ankur On Sat, Jan 10, 2015 at 10:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you are not connecting to the Old RPC Port (9160), new binary port is running on 9042. What is your rpc_address listed in cassandra.yaml? Also make sure you have start_native_transport: *true *in the yaml file. Thanks Best Regards On Sat, Jan 10, 2015 at 8:44 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, We are currently using spark to join data in Cassandra and then write the results back into Cassandra. While reads happen with out any error during the writes we see many exceptions like below. Our environment details are: - Spark v 1.1.0 - spark-cassandra-connector-java_2.10 v 1.1.0 We are using below settings for the writer spark.cassandra.output.batch.size.rows=1 spark.cassandra.output.concurrent.writes=1 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [] - use getErrors() for details) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108) at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks Ankur
Re: Pattern Matching / Equals on Case Classes in Spark Not Working
Is this in the Spark shell? Case classes don't work correctly in the Spark shell unfortunately (though they do work in the Scala shell) because we change the way lines of code compile to allow shipping functions across the network. The best way to get case classes in there is to compile them into a JAR and then add that to your spark-shell's classpath with --jars. Matei On Jan 12, 2015, at 10:04 AM, Rosner, Frank (Allianz SE) frank.ros...@allianz.com wrote: Dear Spark Users, I googled the web for several hours now but I don't find a solution for my problem. So maybe someone from this list can help. I have an RDD of case classes, generated from CSV files with Spark. When I used the distinct operator, there were still duplicates. So I investigated and found out that the equals returns false although the two objects were equal (so were their individual fields as well as toStrings). After googling it I found that the case class equals might break in case the two objects are created by different class loaders. So I implemented my own equals method using mattern matching (code example below). It still didn't work. Some debugging revealed that the problem lies in the pattern matching. Depending on the objects I compare (and maybe the split / classloader they are generated in?) the patternmatching works /doesn't: case class Customer(id: String, age: Option[Int], entryDate: Option[java.util.Date]) { def equals(that: Any): Boolean = that match { case Customer(id, age, entryDate) = { println(Pattern matching worked!) this.id == id this.age == age this.entryDate == entryDate } case _ = false } } //val x: Array[Customer] // ... some spark code to filter original data and collect x scala x(0) Customer(a, Some(5), Some(Fri Sep 23 00:00:00 CEST 1994)) scala x(1) Customer(a, None, None) scala x(2) Customer(a, None, None) scala x(3) Customer(a, None, None) scala x(0) == x(0) // should be true and works Pattern matching works! res0: Boolean = true scala x(0) == x(1) // should be false and works Pattern matching works! res1: Boolean = false scala x(1) == x(2) // should be true, does not work res2: Boolean = false scala x(2) == x(3) // should be true, does not work Pattern matching works! res3: Boolean = true scala x(0) == x(3) // should be false, does not work res4: Boolean = false Why is the pattern matching not working? It seems that there are two kinds of Customers: 0,1 and 2,3 which don't match somehow. Is this related to some classloaders? Is there a way around this other than using instanceof and defining a custom equals operation for every case class I write? Thanks for the help! Frank