Re: MLLib decision tree: Weights
This is not supported in MLlib. Hopefully, we will add support for weighted examples in v1.2. If you want to train weighted instances with the current tree implementation, please try importance sampling first to adjust the weights. For instance, an example with weight 0.3 is sampled with probability 0.3. And if it is sampled, its weight become 1. -Xiangrui On Tue, Sep 2, 2014 at 1:05 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are looking to apply a weight to each training example; this weight should be used when computing the penalty of a misclassified example. For instance, without weighting, each example is penalized 1 point when evaluating the model of a classifier, such as a decision tree. We would like to customize this penalty for each training example, such that we could apply a penalty of W for a misclassified example, where W is a weight associated with the given training example. Is this something that is supported directly in MLLib? I would appreciate if someone can point me in right direction. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New features (Discretization) for v1.x in xiangrui.pdf
We have a pending PR (https://github.com/apache/spark/pull/216) for discretization but it has performance issues. We will try to spend more time to improve it. -Xiangrui On Tue, Sep 2, 2014 at 2:56 AM, filipus floe...@gmail.com wrote: i guess i found it https://github.com/LIDIAgroup/SparkFeatureSelection -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-features-Discretization-for-v1-x-in-xiangrui-pdf-tp13256p13261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New features (Discretization) for v1.x in xiangrui.pdf
howto install? just clone by git clone https://github.com/apache/spark/pull/216 the code and than sbt package? is it the same as https://github.com/LIDIAgroup/SparkFeatureSelection ??? or something different filip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-features-Discretization-for-v1-x-in-xiangrui-pdf-tp13256p13338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New features (Discretization) for v1.x in xiangrui.pdf
I think they are the same. If you have hub (https://hub.github.com/) installed, you can run hub checkout https://github.com/apache/spark/pull/216 and then `sbt/sbt assembly` -Xiangrui On Wed, Sep 3, 2014 at 12:03 AM, filipus floe...@gmail.com wrote: howto install? just clone by git clone https://github.com/apache/spark/pull/216 the code and than sbt package? is it the same as https://github.com/LIDIAgroup/SparkFeatureSelection ??? or something different filip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-features-Discretization-for-v1-x-in-xiangrui-pdf-tp13256p13338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Number of elements in ArrayBuffer
You really should show your Spark code then. I think you are mistaking one of the Spark APIs, and are processing a collection of 1 ArrayBuffer at some point, not an ArrayBuffer. On Wed, Sep 3, 2014 at 6:42 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: I have a problem here. When I run the commands that Rajesh has suggested in Scala REPL, they work fine. But, I want to work in a Spark code, where I need to find the number of elements in an ArrayBuffer. In Spark code, these things are not working. How should I do that? On Wed, Sep 3, 2014 at 10:25 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Deep, Please find below results of ArrayBuffer in scala REPL scala import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer scala val a = ArrayBuffer(5,3,1,4) a: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(5, 3, 1, 4) scala a.head res2: Int = 5 scala a.tail res3: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(3, 1, 4) scala a.length res4: Int = 4 Regards, Rajesh On Wed, Sep 3, 2014 at 10:13 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have the following ArrayBuffer: ArrayBuffer(5,3,1,4) Now, I want to get the number of elements in this ArrayBuffer and also the first element of the ArrayBuffer. I used .length and .size but they are returning 1 instead of 4. I also used .head and .last for getting the first and the last element but they also return the entire ArrayBuffer (ArrayBuffer(5,3,1,4)) What I understand from this is that, the entire ArrayBuffer is stored as one element. How should I go about doing the required things? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Memcached error when using during map
finished a distributed project in hadoop streaming and it worked fine with using memcached storage during mapping. Actually, it's a python project. Now I want to move it to Spark. But when I called the memcached library, two errors was found during computing. (Both) 1. File memcache.py, line 414, in get rkey, rlen = self._expectvalue(server) ValueError: too many values to unpack 2. File memcache.py, line 714, in check_key return key.translate(ill_map) TypeError: character mapping must return integer, None or unicode After adding exception handing, there was no successful cache got at all. However, it works in hadoop streaming without any error. Why? Attached my code. code.zip http://apache-spark-user-list.1001560.n3.nabble.com/file/n13341/code.zip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memcached-error-when-using-during-map-tp13341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
.sparkrc for Spark shell?
To make my shell experience merrier, I need to import several packages, and define implicit sparkContext and sqlContext. Is there a startup file (e.g. ~/.sparkrc) that Spark shell will load when it's started? Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
RDDs
Hi, Can someone tell me what kind of operations can be performed on a replicated rdd?? What are the use-cases of a replicated rdd. One basic doubt that is bothering me from long time: what is the difference between an application and job in the Spark parlance. I am confused b'cas of Hadoop jargon. Thank you
SparkSQL TPC-H query 3 joining multiple tables
Hi, I am trying to run query 3 from the TPC-H benchmark using SparkSQL. But, I am running into errors which I believe are because the parser does not accept the JOIN syntax I am trying. Below are the syntax which I tried and the error messages I am seeing . Exception in thread main java.lang.RuntimeException: [1.159] failure: ``UNION'' expected but `join' found SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority FROM customer c join orders o on c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey WHERE c_mktsegment = 'BUILDING' AND o_orderdate '1995-03-15' AND l_shipdate '1995-03-15' GROUP BY l_orderkey, o_orderdate, o_shippriority ORDER BY revenue desc, o_orderdate LIMIT 10; Exception in thread main java.lang.RuntimeException: [1.125] failure: ``UNION'' expected but `,' found SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority FROM customer c, orders o, lineitem l WHERE l.l_orderkey = o.o_orderkey AND c.c_custkey = o.o_custkey AND c_mktsegment = 'BUILDING' AND o_orderdate '1995-03-15' AND l_shipdate '1995-03-15' GROUP BY l_orderkey, o_orderdate, o_shippriority ORDER BY revenue desc, o_orderdate LIMIT 10; The same syntax works when I join 2 tables (TPC-H query 12 for instance). Any ideas as to what the issue is? Thanks in advance, Samay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-TPC-H-query-3-joining-multiple-tables-tp13344.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Exchanging data between pyspark and scala
Hey, I am about to implement a spark app which will require to use both, pyspark and spark on scala. Data should be read from AWS S3 (compressed CSV files), and must be pre-processed by an existing Python codebase. However, our final goal is to make those datasets available for Spark apps written in either Python or Scala through e.g. Tachyon. S3 = Pyspark = Tachyon = {Py, Scala}Spark Is there any recommended way to pass data between Spark applications implemented in different languages? I thought about using some sort of serialisation framework like Thrift or Avro, but maybe there are other ways to do this (if possible without writing CSV files). I am open for any kind of input! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Invalid Class Exception
Hi, I'm getting the same error while manually setting up Spark cluster. Has there been any update about this error? Rgds Niranda -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Invalid-Class-Exception-tp6859p13346.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Support R in Spark
Does spark ML team have plan to support R script natively? There is a SparkR project, but not from spark team. Spark ML used netlib-java to talk with native fortran routines or use NumPy, why not try to use R in some sense. R had lot of useful packages. If spark ML team can include R support, it will be a very powerful. Any comment? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to list all registered tables in a sql context?
Hi, How can I list all registered tables in a sql context? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: .sparkrc for Spark shell?
Hey, You can use spark-shell -i sparkrc, to do this. Prashant Sharma On Wed, Sep 3, 2014 at 2:17 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: To make my shell experience merrier, I need to import several packages, and define implicit sparkContext and sqlContext. Is there a startup file (e.g. ~/.sparkrc) that Spark shell will load when it's started? Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
parsing json in spark streaming
Hello everyone. I'm trying to receive a DStream structured as a json from a kafka topic and I want to parse the content of each json. The json I receive is something like this: {type:position,ident:IBE32JZ,air_ground:A,alt:34000,clock:1409733420,id:IBE32JZ-1409715361-ed-0002:0,gs:446,heading:71,lat:44.50987,lon:2.98972,reg:ECJRE,squawk:1004,updateType:A,altChange: } I'm trying to extract the ident field only, at least for now. My program looks like this: object ScalaExample { val kafkaHost = localhost val kafkaPort = 9092 val zookeeperHost = localhost val zookeeperPort = 2181 implicit val formats = DefaultFormats case class PlaneInfo(ident: String) def parser(json: String): String = { val parsedJson = parse(json) val m = paso1.extract[PlaneInfo] return m.ident } def main(args : Array[String]) { val zkQuorum = localhost:2181 val group = myGroup val topic = Map(flightStatus - 1) val sparkContext = new SparkContext(local[4], KafkaConsumer) val ssc = new StreamingContext(sparkContext, Seconds(10)) val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic) val id = json.map(_._2).map(parser) id.print ssc.start() } } but it throws me the exception below: java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300) at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33) at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) The thing is that if a run the same without using spark (reading from a file) it works perfectly. The problem starts when I try to put it in a spark program. Also, if I change the parser function to something like this: def parser(json: String): JValue = { val parsedJson = parse(json) return (parsedJson \\ ident) } It also works. But when I try the get the actual String, I get the same error. Thank you for your help. I hope I had explained it well. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parsing-json-in-spark-streaming-tp13352.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to clear broadcast variable from driver memory?
Hi, I tried Broadcast.unpersist() on Spark 1.0.1 but MemoryStore(driver memory) still allocated it. //LOGS //Block broadcast_0 stored as values to memory (estimated size 380.1 MB, free 5.7 GB) The free size of memory was same after calling unpersist. Can I clear this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-clear-broadcast-variable-from-driver-memory-tp13353.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark on yarn hdp hortonworks
Hi all. I am trying to run pyspark on yarn already couple of days: http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/ I posted exception on previous posts. It looks that I didn't do correct configuration. I googled quite a lot and I can't find the steps should be done to configure PySpark running on Yarn. Can you please share the steps (critical points) should be configured to use PaSpark on Yarn ( hortonworks distribution) : Environment variables. Classpath copy jars to all machine other configuration. Thanks Oleg.
Message Passing among workers
Hi, I would like to implement an asynchronous distributed optimization algorithm where workers communicate among one another. It is similar to belief propagation where each worker is a vertex in the graph. Can some one let me know if this is possible using spark? Thanks, Laxman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Message-Passing-among-workers-tp13355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Message Passing among workers
Asynchrony is not supported directly - spark's programming model is naturally BSP. I have seen cases where people have instantiated actors with akka on worker nodes to enable message passing, or even used spark's own ActorSystem to do this. But, I do not recommend this, since you lose a bunch of benefits of spark - e.g. fault tolerance. Instead, I would think about whether your algorithm can be cast as a BSP one, or think about how frequently you really need to synchronize state among your workers. It may be that having the occasional synchronization barrier is OK. On Wed, Sep 3, 2014 at 7:28 AM, laxmanvemula laxman8...@gmail.com wrote: Hi, I would like to implement an asynchronous distributed optimization algorithm where workers communicate among one another. It is similar to belief propagation where each worker is a vertex in the graph. Can some one let me know if this is possible using spark? Thanks, Laxman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Message-Passing-among-workers-tp13355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
Hi Ankur, Thanks so much for your advice. But it failed when I tried to set the storage level in constructing a graph. val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK) Error: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Is there anyone could give me help? Best, Yifan 2014-08-18 23:52 GMT+02:00 Ankur Dave ankurd...@gmail.com: On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote: I am testing our application(similar to personalised page rank using Pregel, and note that each vertex property will need pretty much more space to store after new iteration) [...] But when we ran it on larger graph(e.g. LiveJouranl), it always end at the error GC overhead limit exceeded, even the partitions number is increased to 48 from 8. If the graph (including vertex properties) is too large to fit in memory, you might try allowing it to spill to disk. When constructing the graph, you can set vertexStorageLevel and edgeStorageLevel to StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish. Ankur http://www.ankurdave.com/
Re: SparkSQL TPC-H query 3 joining multiple tables
Are you using SQLContext or HiveContext? The default sql dialect in HiveContext (HiveQL) is a little more complete and might be a better place to start. On Wed, Sep 3, 2014 at 2:12 AM, Samay smilingsa...@gmail.com wrote: Hi, I am trying to run query 3 from the TPC-H benchmark using SparkSQL. But, I am running into errors which I believe are because the parser does not accept the JOIN syntax I am trying. Below are the syntax which I tried and the error messages I am seeing . Exception in thread main java.lang.RuntimeException: [1.159] failure: ``UNION'' expected but `join' found SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority FROM customer c join orders o on c.c_custkey = o.o_custkey join lineitem l on l.l_orderkey = o.o_orderkey WHERE c_mktsegment = 'BUILDING' AND o_orderdate '1995-03-15' AND l_shipdate '1995-03-15' GROUP BY l_orderkey, o_orderdate, o_shippriority ORDER BY revenue desc, o_orderdate LIMIT 10; Exception in thread main java.lang.RuntimeException: [1.125] failure: ``UNION'' expected but `,' found SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority FROM customer c, orders o, lineitem l WHERE l.l_orderkey = o.o_orderkey AND c.c_custkey = o.o_custkey AND c_mktsegment = 'BUILDING' AND o_orderdate '1995-03-15' AND l_shipdate '1995-03-15' GROUP BY l_orderkey, o_orderdate, o_shippriority ORDER BY revenue desc, o_orderdate LIMIT 10; The same syntax works when I join 2 tables (TPC-H query 12 for instance). Any ideas as to what the issue is? Thanks in advance, Samay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-TPC-H-query-3-joining-multiple-tables-tp13344.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: MLLib decision tree: Weights
Dear Xiangrui, Thanks for your reply. We will use sampling for now. However, just to let you know, we believe that it is not the best fit for our problems due to two reasons (1) high dimensionality of data (600) features and (2) Highly skewed distribution. Do you have any idea when MLLib v1.2 will be released? We can plan things accordingly. Date: Tue, 2 Sep 2014 23:15:09 -0700 Subject: Re: MLLib decision tree: Weights From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org This is not supported in MLlib. Hopefully, we will add support for weighted examples in v1.2. If you want to train weighted instances with the current tree implementation, please try importance sampling first to adjust the weights. For instance, an example with weight 0.3 is sampled with probability 0.3. And if it is sampled, its weight become 1. -Xiangrui On Tue, Sep 2, 2014 at 1:05 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are looking to apply a weight to each training example; this weight should be used when computing the penalty of a misclassified example. For instance, without weighting, each example is penalized 1 point when evaluating the model of a classifier, such as a decision tree. We would like to customize this penalty for each training example, such that we could apply a penalty of W for a misclassified example, where W is a weight associated with the given training example. Is this something that is supported directly in MLLib? I would appreciate if someone can point me in right direction. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accessing neighboring elements in an RDD
Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron
Re: Low Level Kafka Consumer for Spark
Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing neighboring elements in an RDD
Interestingly, there was an almost identical question posed on Aug 22 by cjwang. Here's the link to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html#a12664 On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron
Re: Accessing neighboring elements in an RDD
There is support for Spark in ElasticSearch’s Hadoop integration package. http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html Maybe you could split and insert all of your documents from Spark and then query for “MoreLikeThis” on the ElasticSearch index. I haven’t tried it, but maybe someone else has more experience using Spark with ElasticSearch. At some point, maybe there could be an information retrieval package for Spark with locality sensitive hashing and other similar functions. On Sep 3, 2014, at 10:40 AM, Victor Tso-Guillen v...@paxata.com wrote: Interestingly, there was an almost identical question posed on Aug 22 by cjwang. Here's the link to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html#a12664 On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron
How can I start history-server with kerberos HDFS ?
Hi, I have seted properties in conf/spark-defaults.conf and start with command ./sbin/start-history-server.sh /tmp/spark-events. It get errors and seems that the properties in spark-defaults.conf file doesn't effect. How can I solve this problem(Enable properties in spark-defaults.conf when start spark history-server) ? 14/09/04 01:44:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Exception in thread main java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : #history-server spark.history.kerberos.enabled true park.history.kerberos.principal test/spark@test spark.history.kerberos.keytab /home/test/test_spark.keytab spark.eventLog.enabled true Zhanfeng Huo
Re: How can I start history-server with kerberos HDFS ?
The history server (and other Spark daemons) do not read spark-defaults.conf. There's a bug open to implement that (SPARK-2098), and an open PR to fix it, but it's still not in Spark. On Wed, Sep 3, 2014 at 11:00 AM, Zhanfeng Huo huozhanf...@gmail.com wrote: Hi, I have seted properties in conf/spark-defaults.conf and start with command ./sbin/start-history-server.sh /tmp/spark-events. It get errors and seems that the properties in spark-defaults.conf file doesn't effect. How can I solve this problem(Enable properties in spark-defaults.conf when start spark history-server) ? 14/09/04 01:44:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Exception in thread main java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : #history-server spark.history.kerberos.enabled true park.history.kerberos.principal test/spark@test spark.history.kerberos.keytab /home/test/test_spark.keytab spark.eventLog.enabled true Zhanfeng Huo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
I spoke with SK offline about this, it looks like the difference in timings came from the fact that he was training 100 models for 100 iterations and taking the total time (vs. my example which trains a single model for 100 iterations). I'm posting my response here, though, because I think it's worth documenting: Benchmarking on a dataset this small on this many cores is probably not going to give you any meaningful information about how the algorithms scale to real data problems. In this case, you've thrown 200 cores at 5.6kb of data - 200 low-dimensional data points. The overheads of scheduling tasks, sending them out to each worker, and network latencies between the nodes, which are essentially fixed regardless of problem size are COMPLETELY dominating the time spent computing - which in the first two cases is 9-10 flops per data point and in the last case is a couple of array lookups and adds per data point. It would make a lot more sense to find or generate a dataset that's 10 or 100GB and see how performance scales there. You can do this with the code I pasted earlier, just change the second, third, and fourth arguments to an appropriate number of elements, dimensionality, and number of partitions that matches the number of cores you have on your cluster. In short, don't use a cluster unless you need one :). Hope this helps! On Tue, Sep 2, 2014 at 3:51 PM, SK skrishna...@gmail.com wrote: The dataset is quite small : 5.6 KB. It has 200 rows and 3 features, and 1 column of labels. From this dataset, I split 80% for training set and 20% for test set. The features are integer counts and labels are binary (1/0). thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Accessing neighboring elements in an RDD
Thanks for the pointer to that thread. Looks like there is some demand for this capability, but not a lot yet. Also doesn't look like there is an easy answer right now. Thanks, Ron From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Wednesday, September 03, 2014 10:40 AM To: Daniel, Ronald (ELS-SDG) Cc: user@spark.apache.org Subject: Re: Accessing neighboring elements in an RDD Interestingly, there was an almost identical question posed on Aug 22 by cjwang. Here's the link to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html#a12664 On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.commailto:r.dan...@elsevier.com wrote: Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron
spark history server trying to hit port 8021
My Spark history server won't start because it's trying to hit the namenode on 8021, but the namenode is on 8020 (the default). How can I configure the history server to use the right port? I can't find any relevant setting on the docs: http://people.apache.org/~tdas/spark-1.0.0-rc11-docs/monitoring.html Greg
Web UI
Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource(tm) 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Re: spark history server trying to hit port 8021
Nevermind, PEBKAC. I had put in the wrong port in the $LOG_DIR environment variable. Greg From: Greg greg.h...@rackspace.commailto:greg.h...@rackspace.com Date: Wednesday, September 3, 2014 1:56 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: spark history server trying to hit port 8021 My Spark history server won't start because it's trying to hit the namenode on 8021, but the namenode is on 8020 (the default). How can I configure the history server to use the right port? I can't find any relevant setting on the docs: http://people.apache.org/~tdas/spark-1.0.0-rc11-docs/monitoring.html Greg
Re: Web UI
Hi Oliver, Spark standalone master and worker support '/json' endpoint in web UI, which returns some of the information in JSON format. I wasn't able to find relevant documentation, though. - Wonha On Wed, Sep 3, 2014 at 12:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
RE: Web UI
Hello, Thanks for the help! But I tried starting with “–master local[4]” and when I load http://localhost:4040/json I just get forwarded to http://localhost:4040/stages/, and it’s all human-readable HTML, no JSON. Best, Oliver From: Wonha Ryu [mailto:wonha@gmail.com] Sent: Wednesday, September 03, 2014 3:36 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.org Subject: Re: Web UI Hi Oliver, Spark standalone master and worker support '/json' endpoint in web UI, which returns some of the information in JSON format. I wasn't able to find relevant documentation, though. - Wonha On Wed, Sep 3, 2014 at 12:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com wrote: Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582tel:%28617%29%20728-5582 | ext: 275585 oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com | www.Altisource.comhttp://www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. *** *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Re: Accessing neighboring elements in an RDD
There is a sliding method implemented in MLlib (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala), which is used in computing Area Under Curve: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala#L45 With it, you can process neighbor lines by rdd.sliding(3).map { case Seq(l0, l1, l2) = ... } -Xiangrui On Wed, Sep 3, 2014 at 11:30 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Thanks for the pointer to that thread. Looks like there is some demand for this capability, but not a lot yet. Also doesn't look like there is an easy answer right now. Thanks, Ron From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Wednesday, September 03, 2014 10:40 AM To: Daniel, Ronald (ELS-SDG) Cc: user@spark.apache.org Subject: Re: Accessing neighboring elements in an RDD Interestingly, there was an almost identical question posed on Aug 22 by cjwang. Here's the link to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html#a12664 On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Web UI
Hey Oliver, IIRC there's no JSON endpoint for application web UI. They only exist for cluster master and worker. - Wonha On Wed, Sep 3, 2014 at 12:58 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, Thanks for the help! But I tried starting with “–master local[4]” and when I load http://localhost:4040/json I just get forwarded to http://localhost:4040/stages/, and it’s all human-readable HTML, no JSON. Best, Oliver *From:* Wonha Ryu [mailto:wonha@gmail.com] *Sent:* Wednesday, September 03, 2014 3:36 PM *To:* Ruebenacker, Oliver A *Cc:* user@spark.apache.org *Subject:* Re: Web UI Hi Oliver, Spark standalone master and worker support '/json' endpoint in web UI, which returns some of the information in JSON format. I wasn't able to find relevant documentation, though. - Wonha On Wed, Sep 3, 2014 at 12:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. *** *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
At 2014-09-03 17:58:09 +0200, Yifan LI iamyifa...@gmail.com wrote: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK) Error: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level You have to pass the StorageLevel to GraphLoader.edgeListFile: val graph = GraphLoader.edgeListFile( sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) .partitionBy(PartitionStrategy.EdgePartition2D) Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Web UI
Hello, Interestingly, http://localhost:4040/metrics/json/ gives some numbers, but only a few which never seem to change during the application’s lifetime. Either the web UI has some very strange limitations, or there are some URLs yet to be discovered that do something interesting. Best, Oliver From: Wonha Ryu [mailto:wonha@gmail.com] Sent: Wednesday, September 03, 2014 4:27 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.org Subject: Re: Web UI Hey Oliver, IIRC there's no JSON endpoint for application web UI. They only exist for cluster master and worker. - Wonha On Wed, Sep 3, 2014 at 12:58 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com wrote: Hello, Thanks for the help! But I tried starting with “–master local[4]” and when I load http://localhost:4040/json I just get forwarded to http://localhost:4040/stages/, and it’s all human-readable HTML, no JSON. Best, Oliver From: Wonha Ryu [mailto:wonha@gmail.commailto:wonha@gmail.com] Sent: Wednesday, September 03, 2014 3:36 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Web UI Hi Oliver, Spark standalone master and worker support '/json' endpoint in web UI, which returns some of the information in JSON format. I wasn't able to find relevant documentation, though. - Wonha On Wed, Sep 3, 2014 at 12:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com wrote: Hello, What is included in the Spark web UI? What are the available URLs? Can the information be obtained in a machine-readable way (e.g. JSON, XML, etc)? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582tel:%28617%29%20728-5582 | ext: 275585 oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com | www.Altisource.comhttp://www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. *** *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. *** *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Spark Streaming into HBase
I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How do you debug with the logs ?
Hi guys, curious how you deal with the logs. I feel difficulty in debugging with the logs: run spark-streaming in our yarn cluster using client-mode. So have two logs: yarn log and local log ( for client ). Whenever I have problem, the log is too big to read with gedit and grep. (e.g. after running 10 hours, the local log is 1GB ). Do you use any tools to analyze/monitor/read the logs? such as logstash? Thanks, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: spark history server trying to hit port 8021
Hi Greg, For future references you can set spark.history.ui.port in SPARK_HISTORY_OPTS. By default this should point to 18080. This information is actually in the link that you provided :) (as well as the most updated docs here: http://spark.apache.org/docs/latest/monitoring.html) -Andrew 2014-09-03 12:14 GMT-07:00 Greg Hill greg.h...@rackspace.com: Nevermind, PEBKAC. I had put in the wrong port in the $LOG_DIR environment variable. Greg From: Greg greg.h...@rackspace.com Date: Wednesday, September 3, 2014 1:56 PM To: user@spark.apache.org user@spark.apache.org Subject: spark history server trying to hit port 8021 My Spark history server won't start because it's trying to hit the namenode on 8021, but the namenode is on 8020 (the default). How can I configure the history server to use the right port? I can't find any relevant setting on the docs: http://people.apache.org/~tdas/spark-1.0.0-rc11-docs/monitoring.html Greg
Re: pyspark on yarn hdp hortonworks
Hi Oleg, There isn't much you need to do to setup a Yarn cluster to run PySpark. You need to make sure all machines have python installed, and... that's about it. Your assembly jar will be shipped to all containers along with all the pyspark and py4j files needed. One caveat, however, is that the jar needs to be built in maven and not on a Red Hat-based OS, http://spark.apache.org/docs/latest/building-with-maven.html#building-for-pyspark-on-yarn In addition, it should be built with Java 6 because of a known issue with building jars with Java 7 and including python files in them ( https://issues.apache.org/jira/browse/SPARK-1718). Lastly, if you have trouble getting it to work, you can follow the steps I have listed in a different thread to figure out what's wrong: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3ccamjob8mr1+ias-sldz_rfrke_na2uubnmhrac4nukqyqnun...@mail.gmail.com%3e Let me know if you can get it working, -Andrew 2014-09-03 5:03 GMT-07:00 Oleg Ruchovets oruchov...@gmail.com: Hi all. I am trying to run pyspark on yarn already couple of days: http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/ I posted exception on previous posts. It looks that I didn't do correct configuration. I googled quite a lot and I can't find the steps should be done to configure PySpark running on Yarn. Can you please share the steps (critical points) should be configured to use PaSpark on Yarn ( hortonworks distribution) : Environment variables. Classpath copy jars to all machine other configuration. Thanks Oleg.
Re: How to clear broadcast variable from driver memory?
Hi Kevin, there is currently no way to do this... Broadcast.unpersist() only unpersists it from the executors, but not from the driver. However, this is not that bad because the Spark automatically cleans up broadcasts that are no longer used, even on the driver. So as long as there is no memory pressure for Spark to clean things up, the broadcast will stick around. Thanks for pointing this out. We should probably expose a mechanism for doing this. Andrew 2014-09-03 4:56 GMT-07:00 Kevin Jung itsjb.j...@samsung.com: Hi, I tried Broadcast.unpersist() on Spark 1.0.1 but MemoryStore(driver memory) still allocated it. //LOGS //Block broadcast_0 stored as values to memory (estimated size 380.1 MB, free 5.7 GB) The free size of memory was same after calling unpersist. Can I clear this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-clear-broadcast-variable-from-driver-memory-tp13353.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
This doesn't seem to have to do with HBase per se. Some function is getting the StreamingContext into the closure and that won't work. Is this exactly the code? since it doesn't reference a StreamingContext, but is there maybe a different version in reality that tries to use StreamingContext inside a function? On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu yuzhih...@gmail.com wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Accessing neighboring elements in an RDD
Thanks Xiangrui, that looks very helpful. Best regards, Ron -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Wednesday, September 03, 2014 1:19 PM To: Daniel, Ronald (ELS-SDG) Cc: Victor Tso-Guillen; user@spark.apache.org Subject: Re: Accessing neighboring elements in an RDD There is a sliding method implemented in MLlib (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/a pache/spark/mllib/rdd/SlidingRDD.scala), which is used in computing Area Under Curve: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/a pache/spark/mllib/evaluation/AreaUnderCurve.scala#L45 With it, you can process neighbor lines by rdd.sliding(3).map { case Seq(l0, l1, l2) = ... } -Xiangrui On Wed, Sep 3, 2014 at 11:30 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Thanks for the pointer to that thread. Looks like there is some demand for this capability, but not a lot yet. Also doesn't look like there is an easy answer right now. Thanks, Ron From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Wednesday, September 03, 2014 10:40 AM To: Daniel, Ronald (ELS-SDG) Cc: user@spark.apache.org Subject: Re: Accessing neighboring elements in an RDD Interestingly, there was an almost identical question posed on Aug 22 by cjwang. Here's the link to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-a nd-next-element-in-a-sorted-RDD-td12621.html#a12664 On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron
Re: Spark Streaming into HBase
Sean, I create a streaming context near the bottom of the code (ssc) and basically apply a foreachRDD on the resulting DStream so that I can get access to the underlying RDD, which in return I apply a foreach on and pass in my function which applies the storing logic. Is there a different approach I should be using? Thanks for the help. On Wed, Sep 3, 2014 at 2:43 PM, Sean Owen-2 [via Apache Spark User List] ml-node+s1001560n13385...@n3.nabble.com wrote: This doesn't seem to have to do with HBase per se. Some function is getting the StreamingContext into the closure and that won't work. Is this exactly the code? since it doesn't reference a StreamingContext, but is there maybe a different version in reality that tries to use StreamingContext inside a function? On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=0 wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=1 wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=2 wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=3 wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=4 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=5 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=6 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=7 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13385.html To unsubscribe from Spark Streaming into HBase, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=13378code=a3BlbmcxQGdtYWlsLmNvbXwxMzM3OHwxMjA2NzA5NzQ3 . NAML
If master is local, where are master and workers?
Hello, If launched with local as master, where are master and workers? Do they each have a web UI? How can they be monitored? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource(tm) 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Re: If master is local, where are master and workers?
local means everything runs in the same process; that means there is no need for master and worker daemons to start processes. On Wed, Sep 3, 2014 at 3:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, If launched with “local” as master, where are master and workers? Do they each have a web UI? How can they be monitored? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. *** -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: If master is local, where are master and workers?
How can that single process be monitored? Thanks! -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Wednesday, September 03, 2014 6:32 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.org Subject: Re: If master is local, where are master and workers? local means everything runs in the same process; that means there is no need for master and worker daemons to start processes. On Wed, Sep 3, 2014 at 3:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, If launched with “local” as master, where are master and workers? Do they each have a web UI? How can they be monitored? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com ** * This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ** * -- Marcelo *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Re: If master is local, where are master and workers?
The only monitoring available is the driver's Web UI, which will generally be available on port 4040. On Wed, Sep 3, 2014 at 3:43 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: How can that single process be monitored? Thanks! -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Wednesday, September 03, 2014 6:32 PM To: Ruebenacker, Oliver A Cc: user@spark.apache.org Subject: Re: If master is local, where are master and workers? local means everything runs in the same process; that means there is no need for master and worker daemons to start processes. On Wed, Sep 3, 2014 at 3:12 PM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, If launched with “local” as master, where are master and workers? Do they each have a web UI? How can they be monitored? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com ** * This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ** * -- Marcelo *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. *** -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
Ted, Here is the full stack trace coming from spark-shell: 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming job 1409786463000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Basically, what I am doing on the terminal where I run nc -lk, I type in words separated by commas and hit enter i.e. bill,ted. On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
Re: Running Wordcount on large file stucks and throws OOM exception
In word count, you don’t need much driver memory, unless you do collect, but it is not recommended. val file = sc.textFile(hdfs://sandbox.hortonworks.com:8020/tmp/data) val counts = file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_ + _) counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount) Thanks. Zhan Zhang On Aug 26, 2014, at 12:35 AM, motte1988 wir12...@studserv.uni-leipzig.de wrote: Hello, it's me again. Now I've got an explanation for the behaviour. It seems that the driver memory is not large enough to hold the whole result set of saveAsTextFile In-Memory. And then OOM occures. I test it with a filter-step that removes KV-pairs with WordCount smaller 100,000. So now the job finished successfully. But is this the desired behaviour of Spark, that available driver memory limits the size of the result set? Or is my explanation wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Wordcount-on-large-file-stucks-and-throws-OOM-exception-tp12747p12809.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: How can I start history-server with kerberos HDFS ?
Hi Zhanfeng, You will need to set these through SPARK_HISTORY_OPTS in conf/spark-env.sh. This is documented here: http://spark.apache.org/docs/latest/monitoring.html . Let me know if you have it working, -Andrew 2014-09-03 11:14 GMT-07:00 Marcelo Vanzin van...@cloudera.com: The history server (and other Spark daemons) do not read spark-defaults.conf. There's a bug open to implement that (SPARK-2098), and an open PR to fix it, but it's still not in Spark. On Wed, Sep 3, 2014 at 11:00 AM, Zhanfeng Huo huozhanf...@gmail.com wrote: Hi, I have seted properties in conf/spark-defaults.conf and start with command ./sbin/start-history-server.sh /tmp/spark-events. It get errors and seems that the properties in spark-defaults.conf file doesn't effect. How can I solve this problem(Enable properties in spark-defaults.conf when start spark history-server) ? 14/09/04 01:44:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Exception in thread main java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : #history-server spark.history.kerberos.enabled true park.history.kerberos.principal test/spark@test spark.history.kerberos.keytab /home/test/test_spark.keytab spark.eventLog.enabled true Zhanfeng Huo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[MLib] How do you normalize features?
It seems like the next release will add a nice org.apache.spark.mllib.feature package but what is the recommended way to normalize features in the current release (1.0.2) -- I'm hoping for a general pointer here. At the moment I have a RDD[LabeledPoint] and I can get a MultivariateStatisticalSummary for mean/variance. Is that about the right way to proceed? I'm also not seeing an easy way to subtract vectors -- do I need to do this element-wise? thanks
Re: RDDs
Hello, On Wed, Sep 3, 2014 at 6:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Can someone tell me what kind of operations can be performed on a replicated rdd?? What are the use-cases of a replicated rdd. I suggest you read https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds as an introduction, it lists a lot of the transformations and output operations you can use. Personally, I also found it quite helpful to read the paper about RDDs: http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf One basic doubt that is bothering me from long time: what is the difference between an application and job in the Spark parlance. I am confused b'cas of Hadoop jargon. OK, someone else might answer that. I am myself confused with application, job, task, stage etc. ;-) Tobias
Re: What is the appropriate privileges needed for writting files into checkpoint directory?
I found the answer. Here the file system of the checkpoint should be a fault-tolerant file system like HDFS, so we should set it to a HDFS path. It is not a local file system path. 2014-09-03 10:28 GMT+08:00 Tao Xiao xiaotao.cs@gmail.com: I tried to run KafkaWordCount in a Spark standalone cluster. In this application, the checkpoint directory was set as follows : val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(checkpoint) After submitting my application into the cluster, I could see the correct counting results on the console, but the running application kept complaining the following: 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171 (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:206) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:202) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.init(ChecksumFileSystem.java:384) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849) at org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) On the node where I submitted the applicaition, the checkpoint directory( /usr/games/SparkStreaming/checkpoint) was created and some files was created there, but there existed no such directory on other nodes of the Spark cluster. I guess that was because processes on other nodes of the cluster didn't have appropriate privileges to create the checkpoint directory. So I created that directory on each node manually and changed its mode to 777, which means any user can write to that directory. But the SparkStreaming application still kept throwing that exception. So what is the real reason? Thanks.
Multi-tenancy for Spark (Streaming) Applications
Hi, I am not sure if multi-tenancy is the right word, but I am thinking about a Spark application where multiple users can, say, log into some web interface and specify a data processing pipeline with streaming source, processing steps, and output. Now as far as I know, there can be only one StreamingContext per JVM and also I cannot add sources or processing steps once it has been started. Are there any ideas/suggestinos for how to achieve a dynamic adding and removing of input sources and processing pipelines? Do I need a separate 'java' process per user? Also, can I realize such a thing when using YARN for dynamic allocation? Thanks Tobias
Re: Multi-tenancy for Spark (Streaming) Applications
In the current state of Spark Streaming, creating separate Java processes each having a streaming context is probably the best approach to dynamically adding and removing of input sources. All of these should be able to to use a YARN cluster for resource allocation. On Wed, Sep 3, 2014 at 6:30 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I am not sure if multi-tenancy is the right word, but I am thinking about a Spark application where multiple users can, say, log into some web interface and specify a data processing pipeline with streaming source, processing steps, and output. Now as far as I know, there can be only one StreamingContext per JVM and also I cannot add sources or processing steps once it has been started. Are there any ideas/suggestinos for how to achieve a dynamic adding and removing of input sources and processing pipelines? Do I need a separate 'java' process per user? Also, can I realize such a thing when using YARN for dynamic allocation? Thanks Tobias
Re: [MLib] How do you normalize features?
Maybe copy the implementation of StandardScaler from 1.1 and use it in v1.0.x. -Xiangrui On Wed, Sep 3, 2014 at 5:10 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: It seems like the next release will add a nice org.apache.spark.mllib.feature package but what is the recommended way to normalize features in the current release (1.0.2) -- I'm hoping for a general pointer here. At the moment I have a RDD[LabeledPoint] and I can get a MultivariateStatisticalSummary for mean/variance. Is that about the right way to proceed? I'm also not seeing an easy way to subtract vectors -- do I need to do this element-wise? thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RDDs
Not sure what did you refer to when saying replicated rdd, if you actually mean RDD, then, yes , read the API doc and paper as Tobias mentioned. If you actually focus on the word replicated, then that is for fault tolerant, and probably mostly used in the streaming case for receiver created RDD. For Spark, Application is your user program. And a job is an internal schedule conception, It's a group of some RDD operation. Your applications might invoke several jobs. Best Regards, Raymond Liu From: rapelly kartheek [mailto:kartheek.m...@gmail.com] Sent: Wednesday, September 03, 2014 5:03 PM To: user@spark.apache.org Subject: RDDs Hi, Can someone tell me what kind of operations can be performed on a replicated rdd?? What are the use-cases of a replicated rdd. One basic doubt that is bothering me from long time: what is the difference between an application and job in the Spark parlance. I am confused b'cas of Hadoop jargon. Thank you
Why spark on yarn applicationmaster cannot get a proper resourcemanager address from yarnconfiguration?
Hello, I tried to submit a spark job to yarn cluster, there is an error occured with those messages: [root@saturn00 bin]# ./spark-submit --class SparkHiveJoin --master yarn-cluster --num-executors 10 --executor-memory 12g --executor-cores 1 spark.jarSpark assembly has been built with Hive, including Datanucleus jars on classpathWarning: Ignoring non-spark config property: yarn.resourcemanager.address=10.150.20.22:8032Warning: Ignoring non-spark config property: yarn.resourcemanager.address=10.150.20.22:803214/09/04 11:01:27 INFO client.RMProxy: Connecting to ResourceManager at /10.150.20.22:803214/09/04 11:01:27 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1014/09/04 11:01:27 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 014/09/04 11:01:27 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 1228814 /09/04 11:01:27 INFO yarn.Client: Preparing Local resources14/09/04 11:01:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/09/04 11:01:27 INFO yarn.Client: Uploading file:/opt/spark-1.0.2-bin-hadoop2/bin/spark.jar to hdfs://10.150.20.22:8020/user/root/.sparkStaging/application_1409759471992_0007/spark.jar14/09/04 11:01:28 INFO yarn.Client: Uploading file:/opt/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.4.0.jar to hdfs://10.150.20.22:8020/user/root/.sparkStaging/application_1409759471992_0007/spark-assembly-1.0.2-hadoop2.4.0.jar14/09/04 11:01:30 INFO yarn.Client: Setting up the launch environment14/09/04 11:01:30 INFO yarn.Client: Setting up container launch context, , /stderr)14/09/04 11:01:30 INFO yarn.Client: Submitting application to ASM14/09/04 11:01:30 INFO impl.YarnClientImpl: Submitted application application_1409759471992_000714/09/04 11:01:31 INFO yarn.Client: Application repo rt from ASM:application identifier: application_1409759471992_0007 appId: 7clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: -1appStartTime: 1409796090132 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://saturn00:8088/proxy/application_1409759471992_0007/ appUser: root14/09/04 11:01:32 INFO yarn.Client: Application report from ASM: application identifier: application_1409759471992_0007 appId: 7 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: -1appStartTime: 1409796090132 yarnAppState: ACCEPTED distributedFinalState: UNDEFINEDappTrackingUrl: http://saturn00:8088/proxy/application_1409759471992_0007/ appUser: root So, I found the log of applicationmaster of spark on yarn job, here is the logs: 14/09/04 11:01:33 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1409759471992_0007_01 14/09/04 11:01:33 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8030 14/09/04 11:01:33 INFO SecurityManager: Changing view acls to: root 14/09/04 11:01:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/04 11:01:33 INFO ApplicationMaster: Starting the user JAR in a separate Thread 14/09/04 11:01:33 INFO ApplicationMaster: Waiting for Spark context initialization 14/09/04 11:01:33 INFO ApplicationMaster: Waiting for Spark context initialization ... 0 14/09/04 11:01:33 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 14/09/04 11:01:33 INFO SecurityManager: Changing view acls to: root 14/09/04 11:01:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/04 11:01:33 INFO Slf4jLogger: Slf4jLogger started 14/09/04 11:01:33 INFO Remoting: Starting remoting 14/09/04 11:01:33 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@saturn02:58444] 14/09/04 11:01:33 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@saturn02:58444] 14/09/04 11:01:33 INFO SparkEnv: Registering MapOutputTracker 14/09/04 11:01:33 INFO SparkEnv: Registering BlockManagerMaster 14/09/04 11:01:33 INFO DiskBlockManager: Created local directory at /home/hadoop/logs/hadoop_tmp/usercache/root/appcache/application_1409759471992_0007/spark-local-20140904110133-a55e 14/09/04 11:01:33 INFO MemoryStore: MemoryStore started with capacity 273.1 MB. 14/09/04 11:01:33 INFO ConnectionManager: Bound socket to port 43903 with id = ConnectionManagerId(saturn02,43903) 14/09/04 11:01:33 INFO BlockManagerMaster: Trying to register BlockManager 14/09/04 11:01:33 INFO BlockManagerInfo: Registering block manager
Re: Is there any way to control the parallelism in LogisticRegression
Hi Xiangrui, A side-by question about MLLib. It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support L2 regurization, the doc explains it: The L1 regularization by using L1Updater http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater will not work since the soft-thresholding logic in L1Updater is designed for gradient descent. Since the algorithm comes from Breeze and I noticed Breeze actually supports L1 (OWLQN http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN). Wondering if there is some special considerations that current MLLib didn't support OWLQN? And any plan to add it? Thanks for your time! On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong dong...@gmail.com wrote: Update. I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which sounds could control the locality. The default value is 0.1 (smaller value means lower locality). I change it to 1.0 (full locality) and use #3 approach, then find a lot improvement (20%~40%). Although the Web UI still shows the type of task as 'ANY' and the input is from shuffle read, but the real performance is much better before change this parameter. [image: Inline image 1] I think the benefit includes: 1. This approach keep the physical partition size small, but make each task handle multiple partitions. So the memory requested for deserialization is reduced, which also reduce the GC time. That is exactly what we observed in our job. 2. This approach will not hit the 2G limitation, because it not change the partition size. And I also think that, Spark may change this default value, or at least expose this parameter to users (*CoalescedRDD *is a private class, and *RDD*.*coalesce* also don't have a parameter to control that). On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng men...@gmail.com wrote: Sorry, I missed #2. My suggestion is the same as #2. You need to set a bigger numPartitions to avoid hitting integer bound or 2G limitation, at the cost of increased shuffle size per iteration. If you use a CombineInputFormat and then cache, it will try to give you roughly the same size per partition. There will be some remote fetches from HDFS but still cheaper than calling RDD.repartition(). For coalesce without shuffle, I don't know how to set the right number of partitions either ... -Xiangrui On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G (https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache.
Re: Is there any way to control the parallelism in LogisticRegression
+DB David (They implemented QWLQN on Spark today.) On Sep 3, 2014 7:18 PM, Jiusheng Chen chenjiush...@gmail.com wrote: Hi Xiangrui, A side-by question about MLLib. It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support L2 regurization, the doc explains it: The L1 regularization by using L1Updater http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater will not work since the soft-thresholding logic in L1Updater is designed for gradient descent. Since the algorithm comes from Breeze and I noticed Breeze actually supports L1 (OWLQN http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN). Wondering if there is some special considerations that current MLLib didn't support OWLQN? And any plan to add it? Thanks for your time! On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong dong...@gmail.com wrote: Update. I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which sounds could control the locality. The default value is 0.1 (smaller value means lower locality). I change it to 1.0 (full locality) and use #3 approach, then find a lot improvement (20%~40%). Although the Web UI still shows the type of task as 'ANY' and the input is from shuffle read, but the real performance is much better before change this parameter. [image: Inline image 1] I think the benefit includes: 1. This approach keep the physical partition size small, but make each task handle multiple partitions. So the memory requested for deserialization is reduced, which also reduce the GC time. That is exactly what we observed in our job. 2. This approach will not hit the 2G limitation, because it not change the partition size. And I also think that, Spark may change this default value, or at least expose this parameter to users (*CoalescedRDD *is a private class, and *RDD*.*coalesce* also don't have a parameter to control that). On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng men...@gmail.com wrote: Sorry, I missed #2. My suggestion is the same as #2. You need to set a bigger numPartitions to avoid hitting integer bound or 2G limitation, at the cost of increased shuffle size per iteration. If you use a CombineInputFormat and then cache, it will try to give you roughly the same size per partition. There will be some remote fetches from HDFS but still cheaper than calling RDD.repartition(). For coalesce without shuffle, I don't know how to set the right number of partitions either ... -Xiangrui On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger
Re: Why spark on yarn applicationmaster cannot get a proper resourcemanager address from yarnconfiguration?
Did you follow the exact step in this page https://spark.apache.org/docs/1.0.2/running-on-yarn.html ? Please be sure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. Guodong On Thu, Sep 4, 2014 at 10:15 AM, 남윤민 rony...@dgist.ac.kr wrote: Hello, I tried to submit a spark job to yarn cluster, there is an error occured with those messages: [root@saturn00 bin]# ./spark-submit --class SparkHiveJoin --master yarn-cluster --num-executors 10 --executor-memory 12g --executor-cores 1 spark.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Warning: Ignoring non-spark config property: yarn.resourcemanager.address= 10.150.20.22:8032 Warning: Ignoring non-spark config property: yarn.resourcemanager.address= 10.150.20.22:8032 14/09/04 11:01:27 INFO client.RMProxy: Connecting to ResourceManager at / 10.150.20.22:8032 14/09/04 11:01:27 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 10 14/09/04 11:01:27 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/09/04 11:01:27 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 12288 14/09/04 11:01:27 INFO yarn.Client: Preparing Local resources 14/09/04 11:01:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/04 11:01:27 INFO yarn.Client: Uploading file:/opt/spark-1.0.2-bin-hadoop2/bin/spark.jar to hdfs:// 10.150.20.22:8020/user/root/.sparkStaging/application_1409759471992_0007/spark.jar 14/09/04 11:01:28 INFO yarn.Client: Uploading file:/opt/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.4.0.jar to hdfs:// 10.150.20.22:8020/user/root/.sparkStaging/application_1409759471992_000 7/spark-assembly-1.0.2-hadoop2.4.0.jar 14/09/04 11:01:30 INFO yarn.Client: Setting up the launch environment 14/09/04 11:01:30 INFO yarn.Client: Setting up container launch context 14/09/04 11:01:30 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.local.dir=\/hadoop/spark\, -Dspark.shuffle.consolidateFiles=\true\, -Dspark.executor.memory=\12g\, -Dspark.master=\spark://10.150.20.22:8081\, -Dspark.app.name=\SparkHiveJoin\, -Dspark.eventLog.enabled=\true\, -Dspark.spill=\true\, -Dspark.serializer=\org.apache.spark.serializer.KryoSerializer\, -Dspark.eventLog.dir=\hdfs://10.150.20.22:8020/sparkLog\ http://10.150.20.22:8020/sparkLog%5C, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, SparkHiveJoin, --jar , file:/opt/spark-1.0.2-bin-hadoop2/bin/spark.jar, , --executor-memory, 12288, --executor-cores, 1, --num-e xecutors , 10, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr) 14/09/04 11:01:30 INFO yarn.Client: Submitting application to ASM 14/09/04 11:01:30 INFO impl.YarnClientImpl: Submitted application application_1409759471992_0007 14/09/04 11:01:31 INFO yarn.Client: Application report from ASM: application identifier: application_1409759471992_0007 appId: 7 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMast erRpcPort: -1 appStartTime: 1409796090132 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://saturn00:8088/proxy/application_1409759471992_0007/ appUser: root 14/09/04 11:01:32 INFO yarn.Client: Application report from ASM: application identifier: application_1409759471992_0007 appId: 7 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: -1 appStartTime: 1409796090132 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://saturn00:8088/proxy/application_1409759471992_0007/ appUser: root So, I found the log of applicationmaster of spark on yarn job, here is the logs: 14/09/04 11:01:33 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1409759471992_0007_01 14/09/04 11:01:33 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8030 14/09/04 11:01:33 INFO SecurityManager: Changing view acls to: root 14/09/04 11:01:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/04 11:01:33 INFO ApplicationMaster: Starting the user JAR in a separate Thread 14/09/04 11:01:33 INFO ApplicationMaster: Waiting for Spark context initialization 14/09/04 11:01:33 INFO ApplicationMaster: Waiting for Spark context initialization ... 0 14/09/04 11:01:33 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in
Re: Spark Streaming into HBase
This is some issue with how Scala computes closures. Here because of the function blah it is trying the serialize the whole function that this code is part of. Can you define the function blah outside the main function? In fact you canTry putting the function in a serializable object. object BlahFunction extends Serializable { def blah(row: Array[Byte]) { } } On a related note, opening a connection for every record in the RDD is pretty inefficient. Use rdd.foreachPartition instead - open the connection, write the whole partition, and then close the conneciton. TD On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng kpe...@gmail.com wrote: Ted, Here is the full stack trace coming from spark-shell: 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming job 1409786463000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Basically, what I am doing on the terminal where I run nc -lk, I type in words separated by commas and hit enter i.e. bill,ted. On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0)))
Re: Is there any way to control the parallelism in LogisticRegression
With David's help today, we were able to implement elastic net glm in Spark. It's surprising easy, and with just some modification in breeze's OWLQN code, it just works without further investigation. We did benchmark, and the coefficients are within 0.5% differences compared with R's glmnet package. I guess this is first truly distributed glmnet implementation. Still require some effort to have it in mllib; mostly api cleanup work. 1) I'll submit a PR to breeze which implements weighted regularization in OWLQN. 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505 which we have internal version requiring some cleanup for open source project. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng men...@gmail.com wrote: +DB David (They implemented QWLQN on Spark today.) On Sep 3, 2014 7:18 PM, Jiusheng Chen chenjiush...@gmail.com wrote: Hi Xiangrui, A side-by question about MLLib. It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support L2 regurization, the doc explains it: The L1 regularization by using L1Updater http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater will not work since the soft-thresholding logic in L1Updater is designed for gradient descent. Since the algorithm comes from Breeze and I noticed Breeze actually supports L1 (OWLQN http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN). Wondering if there is some special considerations that current MLLib didn't support OWLQN? And any plan to add it? Thanks for your time! On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong dong...@gmail.com wrote: Update. I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which sounds could control the locality. The default value is 0.1 (smaller value means lower locality). I change it to 1.0 (full locality) and use #3 approach, then find a lot improvement (20%~40%). Although the Web UI still shows the type of task as 'ANY' and the input is from shuffle read, but the real performance is much better before change this parameter. [image: Inline image 1] I think the benefit includes: 1. This approach keep the physical partition size small, but make each task handle multiple partitions. So the memory requested for deserialization is reduced, which also reduce the GC time. That is exactly what we observed in our job. 2. This approach will not hit the 2G limitation, because it not change the partition size. And I also think that, Spark may change this default value, or at least expose this parameter to users (*CoalescedRDD *is a private class, and *RDD*.*coalesce* also don't have a parameter to control that). On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng men...@gmail.com wrote: Sorry, I missed #2. My suggestion is the same as #2. You need to set a bigger numPartitions to avoid hitting integer bound or 2G limitation, at the cost of increased shuffle size per iteration. If you use a CombineInputFormat and then cache, it will try to give you roughly the same size per partition. There will be some remote fetches from HDFS but still cheaper than calling RDD.repartition(). For coalesce without shuffle, I don't know how to set the right number of partitions either ... -Xiangrui On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The
resize memory size for caching RDD
Dear all: Spark uses memory to cache RDD and the memory size is specified by spark.storage.memoryFraction. One the Executor starts, does Spark support adjusting/resizing memory size of this part dynamically? Thanks. -- *Regards,* *Zhaojie*
How to use memcached with spark
tried to connect memcached in map with xmemcached lib, faild: net.rubyeye.xmemcached.exception.MemcachedException: There is no available connection at this moment is there anybody succeed to use memcached? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-memcached-with-spark-tp13409.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Support R in Spark
Thanks, Shivaram. No specific use case yet. We try to use R in our project as data scientest are all knowing R. We had a concern that how R handles the mass data. Spark does a better work on big data area, and Spark ML is focusing on predictive analysis area. Then we are thinking whether we can merge R and Spark together. We tried SparkR and it is pretty easy to use. But we didn’t see any feedback on this package in industry. It will be better if Spark team has R support just like scala/Java/Python. Another question is that MLlib will re-implement all famous data mining algorithms in Spark, then what is the purpose of using R? There is another technique for us H2O which support R natively. H2O is more friendly to data scientist. I saw H2O can also work on Spark (Sparkling Water). It is better than using SparkR? Thanks and Regards. Kui On Sep 4, 2014, at 1:47 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Hi Do you have a specific use-case where SparkR doesn't work well ? We'd love to hear more about use-cases and features that can be improved with SparkR. Thanks Shivaram On Wed, Sep 3, 2014 at 3:19 AM, oppokui oppo...@gmail.com wrote: Does spark ML team have plan to support R script natively? There is a SparkR project, but not from spark team. Spark ML used netlib-java to talk with native fortran routines or use NumPy, why not try to use R in some sense. R had lot of useful packages. If spark ML team can include R support, it will be a very powerful. Any comment? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: resize memory size for caching RDD
AFAIK, No. Best Regards, Raymond Liu From: 牛兆捷 [mailto:nzjem...@gmail.com] Sent: Thursday, September 04, 2014 11:30 AM To: user@spark.apache.org Subject: resize memory size for caching RDD Dear all: Spark uses memory to cache RDD and the memory size is specified by spark.storage.memoryFraction. One the Executor starts, does Spark support adjusting/resizing memory size of this part dynamically? Thanks. -- Regards, Zhaojie - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there any way to control the parallelism in LogisticRegression
Thanks DB and Xiangrui. Glad to know you guys are actively working on it. Another thing, did we evaluate the loss of using Float to store values? currently it is Double. Use fewer bits has the benifit of memory footprint reduction. According to Google, they even uses 16 bits (a special encoding scheme called q2.13) http://jmlr.org/proceedings/papers/v28/golovin13.pdf in their learner without measurable loss, and can save 75% memory. On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai dbt...@dbtsai.com wrote: With David's help today, we were able to implement elastic net glm in Spark. It's surprising easy, and with just some modification in breeze's OWLQN code, it just works without further investigation. We did benchmark, and the coefficients are within 0.5% differences compared with R's glmnet package. I guess this is first truly distributed glmnet implementation. Still require some effort to have it in mllib; mostly api cleanup work. 1) I'll submit a PR to breeze which implements weighted regularization in OWLQN. 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505 which we have internal version requiring some cleanup for open source project. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng men...@gmail.com wrote: +DB David (They implemented QWLQN on Spark today.) On Sep 3, 2014 7:18 PM, Jiusheng Chen chenjiush...@gmail.com wrote: Hi Xiangrui, A side-by question about MLLib. It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support L2 regurization, the doc explains it: The L1 regularization by using L1Updater http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater will not work since the soft-thresholding logic in L1Updater is designed for gradient descent. Since the algorithm comes from Breeze and I noticed Breeze actually supports L1 (OWLQN http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN). Wondering if there is some special considerations that current MLLib didn't support OWLQN? And any plan to add it? Thanks for your time! On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong dong...@gmail.com wrote: Update. I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which sounds could control the locality. The default value is 0.1 (smaller value means lower locality). I change it to 1.0 (full locality) and use #3 approach, then find a lot improvement (20%~40%). Although the Web UI still shows the type of task as 'ANY' and the input is from shuffle read, but the real performance is much better before change this parameter. [image: Inline image 1] I think the benefit includes: 1. This approach keep the physical partition size small, but make each task handle multiple partitions. So the memory requested for deserialization is reduced, which also reduce the GC time. That is exactly what we observed in our job. 2. This approach will not hit the 2G limitation, because it not change the partition size. And I also think that, Spark may change this default value, or at least expose this parameter to users (*CoalescedRDD *is a private class, and *RDD*.*coalesce* also don't have a parameter to control that). On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng men...@gmail.com wrote: Sorry, I missed #2. My suggestion is the same as #2. You need to set a bigger numPartitions to avoid hitting integer bound or 2G limitation, at the cost of increased shuffle size per iteration. If you use a CombineInputFormat and then cache, it will try to give you roughly the same size per partition. There will be some remote fetches from HDFS but still cheaper than calling RDD.repartition(). For coalesce without shuffle, I don't know how to set the right number of partitions either ... -Xiangrui On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11,
Re: Re: How can I start history-server with kerberos HDFS ?
Thanks for your help. It works after setting SPARK_HISTORY_OPTS. Zhanfeng Huo From: Andrew Or Date: 2014-09-04 07:52 To: Marcelo Vanzin CC: Zhanfeng Huo; user Subject: Re: How can I start history-server with kerberos HDFS ? Hi Zhanfeng, You will need to set these through SPARK_HISTORY_OPTS in conf/spark-env.sh. This is documented here: http://spark.apache.org/docs/latest/monitoring.html. Let me know if you have it working, -Andrew 2014-09-03 11:14 GMT-07:00 Marcelo Vanzin van...@cloudera.com: The history server (and other Spark daemons) do not read spark-defaults.conf. There's a bug open to implement that (SPARK-2098), and an open PR to fix it, but it's still not in Spark. On Wed, Sep 3, 2014 at 11:00 AM, Zhanfeng Huo huozhanf...@gmail.com wrote: Hi, I have seted properties in conf/spark-defaults.conf and start with command ./sbin/start-history-server.sh /tmp/spark-events. It get errors and seems that the properties in spark-defaults.conf file doesn't effect. How can I solve this problem(Enable properties in spark-defaults.conf when start spark history-server) ? 14/09/04 01:44:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 14/09/04 01:44:05 WARN UserGroupInformation: PriviledgedActionException as:root (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Exception in thread main java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : #history-server spark.history.kerberos.enabled true park.history.kerberos.principal test/spark@test spark.history.kerberos.keytab /home/test/test_spark.keytab spark.eventLog.enabled true Zhanfeng Huo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RDDs
Thank you Raymond and Tobias. Yeah, I am very clear about what I was asking. I was talking about replicated rdd only. Now that I've got my understanding about job and application validated, I wanted to know if we can replicate an rdd and run two jobs (that need same rdd) of an application in parallel?. -Karthk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p13416.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: memory size for caching RDD
Changing this is not supported, it si immutable similar to other spark configuration settings. On Wed, Sep 3, 2014 at 8:13 PM, 牛兆捷 nzjem...@gmail.com wrote: Dear all: Spark uses memory to cache RDD and the memory size is specified by spark.storage.memoryFraction. One the Executor starts, does Spark support adjusting/resizing memory size of this part dynamically? Thanks. -- *Regards,* *Zhaojie* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Starting Thriftserver via hostname on Spark 1.1 RC4?
When I start the thrift server (on Spark 1.1 RC4) via: ./sbin/start-thriftserver.sh --master spark://hostname:7077 --driver-class-path $CLASSPATH It appears that the thrift server is starting off of localhost as opposed to hostname. I have set the spark-env.sh to use the hostname, modified the /etc/hosts for the hostname, and it appears to work properly. But when I start the thrift server, connectivity can only be via localhost:1 as opposed to hostname:1. Any ideas on what configurations I may be setting incorrectly here? Thanks! Denny