Re: Add StructType column to SchemaRDD
The types expected by applySchema are documented in the type reference section: http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference I'd certainly accept a PR to improve the docs and add a link to this from the applySchema section :) Can you explain why you are using mapPartitions and UDFs don't work for you? SQL doesn't really have a great support for partitions in general... We do support for Hive TGFs though and we could possibly add better scala syntax for this concept or something else. On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I have a SchemaRDD where I want to add a column with a value that is computed from the rest of the row. As the computation involves a network operation and requires setup code, I can't use SELECT *, myUDF(*) FROM rdd, but I wanted to use a combination of: - get schema of input SchemaRDD - issue a mapPartitions call (including the setup code), obtaining a new RDD[Row] - extend the schema manually - create a new RDD by combining the RDD[Row] with the extended schema. This works very well, but I run into trouble querying that resulting SchemaRDD with SQL if: - the result of my computation is a case class - and I want to use values in this case class in the SQL query. In particular, while SELECT column FROM resultrdd works well, SELECT column.key_name FROM resultrdd gives a java.lang.ClassCastException: example.MyCaseClass cannot be cast to org.apache.spark.sql.catalyst.expressions.Row Here is an example to illustrate that: --- import org.apache.spark._import org.apache.spark.sql._import org.apache.spark.sql.catalyst.types._ val sc = new SparkContext(local[3], Test) val sqlc = new SQLContext(sc)import sqlc._ // this is the case class that my operation is returningcase class Result(string_values: Map[String, String], num_values: Map[String, Double])// dummy result dataval data = (Result(Map(team - a), Map(score - 0.8)) ::Result(Map(team - b), Map(score - 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my computation by creating an RDD[Row] and creating// a schema programmaticallyval rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil))val progSchema = StructType(StructField(hello, IntegerType, false) :: StructField(newcol, rdd.schema, true) :: Nil)val progRdd = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable(progrdd)// the following call will *fail* with a ClassCastExceptionsqlc.sql(SELECT newcol.string_values['team'] FROM progrdd).foreach(println)// however, the schema I specified is correct. see how embedding// my result in a proper case class works:case class ResultContainer(hello: Int, newcol: Result)val caseClassRdd = rdd.map(dr = ResultContainer(7, dr))caseClassRdd.registerTempTable(caseclassrdd)// the following call will *work*sqlc.sql(SELECT newcol.string_values['team'] FROM caseclassrdd).foreach(println)// even though the schema for both RDDs is the same:progRdd.schema == caseClassRdd.schema --- It turns out that I cannot use the case class directly, but I have to convert it to a Row as well. That is, instead of val rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil)) I have to use val rowRdd = rdd.map(dr = Row.fromSeq(7 :: Row.fromSeq(dr.productIterator.toSeq) :: Nil)) and then, I can use SELECT newcol.string_values['team'] FROM progrdd So now I found that out and I'm happy that it works, but it was quite hard to track it down, so I was wondering if this is the most intuitive way to add a column to a SchemaRDD using mapPartitions (as opposed to using a UDF, where the conversion case class - Row seems to happen automatically). Or, even if there is no more intuitive way, just wanted to have this documented ;-) Thanks Tobias
Re: Add StructType column to SchemaRDD
Hi Michael, On Tue, Jan 6, 2015 at 3:43 PM, Michael Armbrust mich...@databricks.com wrote: Oh sorry, I'm rereading your email more carefully. Its only because you have some setup code that you want to amortize? Yes, exactly that. Concerning the docs, I'd be happy to contribute, but I don't really understand what is happening here and why ;-) Thanks Tobias
Re: Add StructType column to SchemaRDD
Oh sorry, I'm rereading your email more carefully. Its only because you have some setup code that you want to amortize? On Mon, Jan 5, 2015 at 10:40 PM, Michael Armbrust mich...@databricks.com wrote: The types expected by applySchema are documented in the type reference section: http://spark.apache.org/docs/latest/sql-programming-guide.html#spark-sql-datatype-reference I'd certainly accept a PR to improve the docs and add a link to this from the applySchema section :) Can you explain why you are using mapPartitions and UDFs don't work for you? SQL doesn't really have a great support for partitions in general... We do support for Hive TGFs though and we could possibly add better scala syntax for this concept or something else. On Mon, Jan 5, 2015 at 9:52 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I have a SchemaRDD where I want to add a column with a value that is computed from the rest of the row. As the computation involves a network operation and requires setup code, I can't use SELECT *, myUDF(*) FROM rdd, but I wanted to use a combination of: - get schema of input SchemaRDD - issue a mapPartitions call (including the setup code), obtaining a new RDD[Row] - extend the schema manually - create a new RDD by combining the RDD[Row] with the extended schema. This works very well, but I run into trouble querying that resulting SchemaRDD with SQL if: - the result of my computation is a case class - and I want to use values in this case class in the SQL query. In particular, while SELECT column FROM resultrdd works well, SELECT column.key_name FROM resultrdd gives a java.lang.ClassCastException: example.MyCaseClass cannot be cast to org.apache.spark.sql.catalyst.expressions.Row Here is an example to illustrate that: --- import org.apache.spark._import org.apache.spark.sql._import org.apache.spark.sql.catalyst.types._ val sc = new SparkContext(local[3], Test) val sqlc = new SQLContext(sc)import sqlc._ // this is the case class that my operation is returningcase class Result(string_values: Map[String, String], num_values: Map[String, Double])// dummy result dataval data = (Result(Map(team - a), Map(score - 0.8)) ::Result(Map(team - b), Map(score - 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my computation by creating an RDD[Row] and creating// a schema programmaticallyval rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil))val progSchema = StructType(StructField(hello, IntegerType, false) :: StructField(newcol, rdd.schema, true) :: Nil)val progRdd = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable(progrdd)// the following call will *fail* with a ClassCastExceptionsqlc.sql(SELECT newcol.string_values['team'] FROM progrdd).foreach(println)// however, the schema I specified is correct. see how embedding// my result in a proper case class works:case class ResultContainer(hello: Int, newcol: Result)val caseClassRdd = rdd.map(dr = ResultContainer(7, dr))caseClassRdd.registerTempTable(caseclassrdd)// the following call will *work*sqlc.sql(SELECT newcol.string_values['team'] FROM caseclassrdd).foreach(println)// even though the schema for both RDDs is the same:progRdd.schema == caseClassRdd.schema --- It turns out that I cannot use the case class directly, but I have to convert it to a Row as well. That is, instead of val rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil)) I have to use val rowRdd = rdd.map(dr = Row.fromSeq(7 :: Row.fromSeq(dr.productIterator.toSeq) :: Nil)) and then, I can use SELECT newcol.string_values['team'] FROM progrdd So now I found that out and I'm happy that it works, but it was quite hard to track it down, so I was wondering if this is the most intuitive way to add a column to a SchemaRDD using mapPartitions (as opposed to using a UDF, where the conversion case class - Row seems to happen automatically). Or, even if there is no more intuitive way, just wanted to have this documented ;-) Thanks Tobias
Re: Spark for core business-logic? - Replacing: MongoDB?
PredictionIO comes with a event server that handles data collection: http://docs.prediction.io/datacollection/overview/ It's based on HBase, which works fine with Spark as the data store of the event/training data. You probably need a separate CRUD-supported database for your application. Your application will then communicate with PredictionIO after authentication is done, for example. I hope it helps. Regards, Simon On Mon, Jan 5, 2015 at 6:22 PM, Alec Taylor alec.tayl...@gmail.com wrote: Thanks Simon, that's a good way to train on incoming events (and related problems / and result computations). However, does it handle the actual data storage? - E.g.: CRUD documents On Tue, Jan 6, 2015 at 1:18 PM, Simon Chan simonc...@gmail.com wrote: Alec, If you are looking for a Machine Learning stack that supports business-logics, you may take a look at PredictionIO: http://prediction.io/ It's based on Spark and HBase. Simon On Mon, Jan 5, 2015 at 6:14 PM, Alec Taylor alec.tayl...@gmail.com wrote: Thanks all. To answer your clarification questions: - I'm writing this in Python - A similar problem to my actual one is to find common 30 minute slots (over the next 12 months) [r] that k users have in common. Total users: n. Given n=1 and r=17472 then the [naïve] time-complexity is $\mathcal{O}(nr)$. n*r=17,472,000. I may be able to get $\mathcal{O}(n \log r)$ if not $\log \log$ from reading the literature on sequence matching, however this is uncertain. So assuming all the other business-logic which needs to be built in, such as authentication and various other CRUD operations, as well as this more intensive sequence searching operation, what stack would be best for me? Thanks for all suggestions On Mon, Jan 5, 2015 at 4:24 PM, Jörn Franke jornfra...@gmail.com wrote: Hallo, It really depends on your requirements, what kind of machine learning algorithm your budget, if you do currently something really new or integrate it with an existing application, etc.. You can run MongoDB as well as a cluster. I don't think this question can be answered generally, but depends on details of your case. Best regards Le 4 janv. 2015 01:44, Alec Taylor alec.tayl...@gmail.com a écrit : In the middle of doing the architecture for a new project, which has various machine learning and related components, including: recommender systems, search engines and sequence [common intersection] matching. Usually I use: MongoDB (as db), Redis (as cache) and celery (as queue, backed by Redis). Though I don't have experience with Hadoop, I was thinking of using Hadoop for the machine-learning (as this will become a Big Data problem quite quickly). To push the data into Hadoop, I would use a connector of some description, or push the MongoDB backups into HDFS at set intervals. However I was thinking that it might be better to put the whole thing in Hadoop, store all persistent data in Hadoop, and maybe do all the layers in Apache Spark (with caching remaining in Redis). Is that a viable option? - Most of what I see discusses Spark (and Hadoop in general) for analytics only. Apache Phoenix exposes a nice interface for read/write over HBase, so I might use that if Spark ends up being the wrong solution. Thanks for all suggestions, Alec Taylor PS: I need this for both Big and Small data. Note that I am using the Cloudera definition of Big Data referring to processing/storage across more than 1 machine. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Driver hangs on running mllib word2vec
Thanks Zhan, I'm also confused about the jstack output, why the driver gets stuck at org.apache.spark.SparkContext.clean ? On Tue, Jan 6, 2015 at 2:10 PM, Zhan Zhang zzh...@hortonworks.com wrote: I think it is overflow. The training data is quite big. The algorithms scalability highly depends on the vocabSize. Even without overflow, there are still other bottlenecks, for example, syn0Global and syn1Global, each of them has vocabSize * vectorSize elements. Thanks. Zhan Zhang On Jan 5, 2015, at 7:47 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi Xiangrui, Our dataset is about 80GB(10B lines). In the driver's log, we foud this: *INFO Word2Vec: trainWordsCount = -1610413239* it seems that there is a integer overflow? On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng men...@gmail.com wrote: How big is your dataset, and what is the vocabulary size? -Xiangrui On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi, When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup usage. Here is the jstack output: main prio=10 tid=0x40112800 nid=0x46f2 runnable [0x4162e000] java.lang.Thread.State: RUNNABLE at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) at java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) at org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) at com.baidu.inf.WordCount$.main(WordCount.scala:31) at com.baidu.inf.WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Best Regards -- Best Regards CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received
TF-IDF from spark-1.1.0 not working on cluster mode
Hi All, Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in local mode and not on distributed mode. Null pointer exception has been thrown. Is this a bug in spark-1.1.0 ? *Following is the code:* def main(args:Array[String]) { val conf=new SparkConf val sc=new SparkContext(conf) val documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split( ).toSeq) val hashingTF = new HashingTF() val tf= hashingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) val rdd=tfidf.map { vec = println(vector is+vec) (10) } rdd.saveAsTextFile(/home/padma/usecase) } *Exception thrown:* 15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp:// sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167] with ID 0 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM 15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection from [ IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888] 15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection to [ IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130] 15/01/06 12:36:12 INFO network.SendingConnection: Connected to [ IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1 GB) 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB) 15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Thanks, Padma Ch
Re: spark.akka.frameSize limit error
Hi, Thanks for the prompt reply. I checked the code. The main issue is the large number of mappers. If the number of mappers is set to some number around 1000, there will be no problem. I hope the bug gets fixed in the next releases. On Mon, Jan 5, 2015 at 1:26 AM, Josh Rosen rosenvi...@gmail.com wrote: Ah, so I guess this *is* still an issue since we needed to use a bitmap for tracking zero-sized blocks (see https://issues.apache.org/jira/browse/SPARK-3740; this isn't just a performance issue; it's necessary for correctness). This will require a bit more effort to fix, since we'll either have to find a way to use a fixed size / capped size encoding for MapOutputStatuses (which might require changes to let us fetch empty blocks safely) or figure out some other strategy for shipping these statues. I've filed https://issues.apache.org/jira/browse/SPARK-5077 to try to come up with a proper fix. In the meantime, I recommend that you increase your Akka frame size. On Sat, Jan 3, 2015 at 8:51 PM, Saeed Shahrivari saeed.shahriv...@gmail.com wrote: I use the 1.2 version. On Sun, Jan 4, 2015 at 3:01 AM, Josh Rosen rosenvi...@gmail.com wrote: Which version of Spark are you using? It seems like the issue here is that the map output statuses are too large to fit in the Akka frame size. This issue has been fixed in Spark 1.2 by using a different encoding for map outputs for jobs with many reducers ( https://issues.apache.org/jira/browse/SPARK-3613). On earlier Spark versions, your options are either reducing the number of reducers (e.g. by explicitly specifying the number of reducers in the reduceByKey() call) or increasing the Akka frame size (via the spark.akka.frameSize configuration option). On Sat, Jan 3, 2015 at 10:40 AM, Saeed Shahrivari saeed.shahriv...@gmail.com wrote: Hi, I am trying to get the frequency of each Unicode char in a document collection using Spark. Here is the code snippet that does the job: JavaPairRDDLongWritable, Text rows = sc.sequenceFile(args[0], LongWritable.class, Text.class); rows = rows.coalesce(1); JavaPairRDDCharacter,Long pairs = rows.flatMapToPair(t - { String content=t._2.toString(); MultisetCharacter chars= HashMultiset.create(); for(int i=0;icontent.length();i++) chars.add(content.charAt(i)); Listlt;Tuple2lt;Character,Long list=new ArrayListTuple2lt;Character, Long(); for(Character ch:chars.elementSet()){ list.add(new Tuple2Character,Long(ch,(long)chars.count(ch))); } return list; }); JavaPairRDDCharacter, Long counts = pairs.reduceByKey((a, b) - a + b); System.out.printf(MapCount %,d\n,counts.count()); But, I get the following exception: 15/01/03 21:51:34 ERROR MapOutputTrackerMasterActor: Map output statuses were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes). org.apache.spark.SparkException: Map output statuses were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes). at org.apache.spark.MapOutputTrackerMasterActor$$anonfun$receiveWithLogging$1.applyOrElse(MapOutputTracker.scala:59) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.MapOutputTrackerMasterActor.aroundReceive(MapOutputTracker.scala:42) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Would you please tell me where is the fault? If I process fewer rows, there is no problem. However, when the number of rows is large I always get this exception. Thanks
stopping streaming context
Hi experts! Please is there anyway to stop spark streaming context when 5 batches are completed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stopping-streaming-context-tp20970.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Finding most occurrences in a JSON Nested Array
Hi, I'm pretty new to both Spark (and Scala), so I would like to seek some help here: I have this dataset in JSON: In short, I'm trying to find out the top 10 hobbies, sorted DESC by count. So basically i did: Prints... This is where I got stucked... I tried and got: What do I do with an Array[ANY]... should I be casting it? How about ArrayBuffer, how I can iterate within to do counting? One thing I tried unsuccessfully: Any advice or hints are much appreciated, thanks. Ads -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark 1.2: value toJSON is not a member of org.apache.spark.sql.SchemaRDD
Hi everyone, I have just switched to spark 1.2.0 from 1.1.1, updating my sbt to point to the 1.2.0 jars. org.apache.spark %% spark-core % 1.2.0, org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-hive % 1.2.0, I was hoping to use the new SchemaRDD#toJSON among other features, but it seems that this method is not available in the 1.2.0 spark-sql jars. I did a 'javap SchemaRDD.class | grep toJSON' on the unarchived spark-sql_2.10-1.2.0.jar that I got from maven, but it's not there. The SchemaRDD#toJson is present in the source https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala within the branch-1.2 branch though. I am missing something? Or wasn't that feature shipped with 1.2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-value-toJSON-is-not-a-member-of-org-apache-spark-sql-SchemaRDD-tp20972.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: a vague question, but perhaps it might ring a bell
Greeting! Thank you very much for taking the time to respond. My apologies, but at the moment I don't have an example that I feel comfortable posting. Frankly, I've been struggling with variantsof this for the last two weeks and probably won't be able to work on this particular issue for a few days. However, I am intrigued by your comment. You mention when I closethe fs object inside map/mapPartition etc. Where else can one close theobject? If I don't close it, the output file is generally truncated. Again, the code seems to work for a few hundred files, then I get theseweird errors. Is this something subtle related to the shipping of the closure thatI'm not aware of? Can you give a general idea of how you handled this?Is it necessary to create a custom OutputFormat class?I was looking at the OutputFormat code and it looks like it also createsan fs object and starts writing, but perhaps there is some subtle difference in the context? Thank you. Sincerely, Mike From: Akhil Das ak...@sigmoidanalytics.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, January 5, 2015 1:21 AM Subject: Re: a vague question, but perhaps it might ring a bell What are you trying to do? Can you paste the whole code? I used to see this sort of Exception when i close the fs object inside map/mapPartition etc. ThanksBest Regards On Mon, Jan 5, 2015 at 6:43 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! So, I think I have data saved so that each partition (part-r-0, etc)is exactly what I wan to translate into an output file of a format not related to hadoop. I believe I've figured out how to tell Spark to read the data set without re-partitioning (in another post I mentioned this -- I have a non-splitable InputFormat). I do something like mapPartitionWithIndex( (partId, iter) = conf = new Configuration() fs = Filesystem.get(conf) strm = fs.create(new Path(...)) // write data to stream strm.close() // in finally block } This runs for a few hundred input files (so each executors sees 10's of files),and it chugs along nicely, then suddenly everything shuts down.I can restart (telling it to skip the partIds which it has already completed), and itchugs along again for a while (going past the previous stopping point) and again dies. I am a t a loss. This work for the first 10's of files (so it runs for about 1hr) then quits,and I see no useful error information (no Exceptions except the stuff below.I'm not shutting it down. Any idea what I might check? I've bumped up the memory multiple times (16G currently)and fiddled with increasing other parameters. Thanks.Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
Re: FlatMapValues
For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input = val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key = (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen so...@cloudera.com wrote: From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: stopping streaming context
Have a look at StreamingListener http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener, Here's an example http://stackoverflow.com/questions/20950268/how-to-stop-spark-streaming-context-when-the-network-connection-tcp-ip-is-clos on how to use it. Thanks Best Regards On Mon, Jan 5, 2015 at 3:15 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! Please is there anyway to stop spark streaming context when 5 batches are completed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stopping-streaming-context-tp20970.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Implement customized Join for SparkSQL
Hi, All Suppose I want to join two tables A and B as follows: Select * from A join B on A.id = B.id A is a file while B is a database which indexed by id and I wrapped it by Data source API. The desired join flow is: 1. Generate A's RDD[Row] 2. Generate B's RDD[Row] from A by using A's id and B's data source api to get row from the database 3. Merge these two RDDs to the final RDD[Row] However it seems existing join strategy doesn't support it? Any way to achieve it? Best Regards, Kevin.
Override hostname of Spark executors
Hi together, i'm currently setting up a dev-environment for Spark. We are planning to use a commercial, hostname-dongeled 3rd-party-library in our Spark-jobs. The question which arises now: a) Is it possible (maybe on job-level) to tell the Spark-Executor which hostname it should report to the job-application which runs on the executor? Maybe this is something that can be achieved with some specific Executor-implementation (mesos, yarn)? I'm thinking of some job-specific option like dockers -h command line option. And to extend this question: b) Is there some executor, which allows to run jobs within docker-instances? Maybe this would enable me to set hostname on job level. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Override-hostname-of-Spark-executors-tp20968.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mesos resource allocation
Hey Tim, sorry for the delayed reply, been on vacation for a couple weeks. The reason we want to control the number of executors is that running executors with JVM heaps over 30GB causes significant garbage collection problems. We have observed this through much trial-and-error for jobs that are a dozen-or-so stages, running for more than ~20m. For example, if we run 8 executors with 60GB heap each (for example, we have also other values larger than 30GB), even after much tuning of heap parameters (% for RDD cache, etc.) we run into GC problems. Effectively GC becomes so high that it takes over all compute time from the JVM. If we then halve the heap (30GB) but double the number of executors (16), all GC problems are relieved and we get to use the full memory resources of the cluster. We talked with some engineers from Databricks at Strata in Barcelona recently and received the same advice — do not run executors with more than 30GB heaps. Since our machines are 64GB machines and we are typically only running one or two jobs at a time on the cluster (for now), we can only use half the cluster memory with the current configuration options available in Mesos. Happy to hear your thoughts and actually very curious about how others are running Spark on Mesos with large heaps (as a result of large memory machines). Perhaps this is a non-issue when we have more multi-tenancy in the cluster, but for now, this is not the case. Thanks, Josh On 24 December 2014 at 06:22, Tim Chen t...@mesosphere.io wrote: Hi Josh, If you want to cap the amount of memory per executor in Coarse grain mode, then yes you only get 240GB of memory as you mentioned. What's the reason you don't want to raise the capacity of memory you use per executor? In coarse grain mode the Spark executor is long living and it internally will get tasks distributed by Spark internal Coarse grained scheduler. I think the assumption is that it already allocated the maximum available on that slave and don't really assume we need another one. I think it's worth considering having a configuration of number of cores per executor, especially when Mesos have inverse offers and optimistic offers so we can choose to launch more executors when resources becomes available even in coarse grain mode and then support giving the executors back but more higher priority tasks arrive. For fine grain mode, the spark executors are started by Mesos executors that is configured from Mesos scheduler backend. I believe the RDD is cached as long as the Mesos executor is running as the BlockManager is created on executor registration. Let me know if you need any more info. Tim -- Forwarded message -- From: Josh Devins j...@soundcloud.com Date: 22 December 2014 at 17:23 Subject: Mesos resource allocation To: user@spark.apache.org We are experimenting with running Spark on Mesos after running successfully in Standalone mode for a few months. With the Standalone resource manager (as well as YARN), you have the option to define the number of cores, number of executors and memory per executor. In Mesos, however, it appears as though you cannot specify the number of executors, even in coarse-grained mode. If this is the case, how do you define the number of executors to run with? Here's an example of why this matters (to us). Let's say we have the following cluster: num nodes: 8 num cores: 256 (32 per node) total memory: 512GB (64GB per node) If I set my job to require 256 cores and per-executor-memory to 30GB, then Mesos will schedule a single executor per machine (8 executors total) and each executor will get 32 cores to work with. This means that we have 8 executors * 32GB each for a total of 240G of cluster memory in use — less than half of what is available. If you want actually 16 executors in order to increase the amount of memory in use across the cluster, how can you do this with Mesos? It seems that a parameter is missing (or I haven't found it yet) which lets me tune this for Mesos: * number of executors per n-cores OR * number of executors total Furthermore, in fine-grained mode in Mesos, how are the executors started/allocated? That is, since Spark tasks map to Mesos tasks, when and how are executors started? If they are transient and an executor per task is created, does this mean we cannot have cached RDDs? Thanks for any advice or pointers, Josh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Implement customized Join for SparkSQL
Can you paste the error log? From: Dai, Kevin [mailto:yun...@ebay.com] Sent: Monday, January 5, 2015 6:29 PM To: user@spark.apache.org Subject: Implement customized Join for SparkSQL Hi, All Suppose I want to join two tables A and B as follows: Select * from A join B on A.id = B.id A is a file while B is a database which indexed by id and I wrapped it by Data source API. The desired join flow is: 1. Generate A's RDD[Row] 2. Generate B's RDD[Row] from A by using A's id and B's data source api to get row from the database 3. Merge these two RDDs to the final RDD[Row] However it seems existing join strategy doesn't support it? Any way to achieve it? Best Regards, Kevin.
Re: Is it possible to do incremental training using ALSModel (MLlib)?
In the first instance, I'm suggesting that ALS in Spark could perhaps expose a run() method that accepts a previous MatrixFactorizationModel, and uses the product factors from it as the initial state instead. If anybody seconds that idea, I'll make a PR. The second idea is just fold-in: http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14 Whether you do this or something like SGD, inside or outside Spark, depends on your requirements I think. On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey wouter.sam...@storefront.be wrote: Do you know a place where I could find a sample or tutorial for this? I'm still very new at this. And struggling a bit... Thanks in advance Wouter Sent from my iPhone. On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com wrote: Yes, it is easy to simply start a new factorization from the current model solution. It works well. That's more like incremental *batch* rebuilding of the model. That is not in MLlib but fairly trivial to add. You can certainly 'fold in' new data to approximately update with one new datum too, which you can find online. This is not quite the same idea as streaming SGD. I'm not sure this fits the RDD model well since it entails updating one element at a time but mini batch could be reasonable. On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com wrote: I was under the impression that ALS wasn't designed for it :- The famous ebay online recommender uses SGD However, you can try using the previous model as starting point, and gradually reduce the number of iteration after the model stablize. I never verify this idea, so you need to at least cross-validate it before putting into productio On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be wrote: Hi all, I'm curious about MLlib and if it is possible to do incremental training on the ALSModel. Usually training is run first, and then you can query. But in my case, data is collected in real-time and I want the predictions of my ALSModel to consider the latest data without complete re-training phase. I've checked out these resources, but could not find any info on how to solve this: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html My question fits in a larger picture where I'm using Prediction IO, and this in turn is based on Spark. Thanks in advance for any advice! Wouter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to do incremental training using ALSModel (MLlib)?
One other idea was that I don’t need to re-train the model, but simply pass all the current user’s recent ratings (including one’s created after the training) to the existing model… Is this a valid option? Wouter Samaey Zaakvoerder Storefront BVBA Tel: +32 472 72 83 07 Web: http://storefront.be LinkedIn: http://www.linkedin.com/in/woutersamaey On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com wrote: In the first instance, I'm suggesting that ALS in Spark could perhaps expose a run() method that accepts a previous MatrixFactorizationModel, and uses the product factors from it as the initial state instead. If anybody seconds that idea, I'll make a PR. The second idea is just fold-in: http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14 Whether you do this or something like SGD, inside or outside Spark, depends on your requirements I think. On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey wouter.sam...@storefront.be wrote: Do you know a place where I could find a sample or tutorial for this? I'm still very new at this. And struggling a bit... Thanks in advance Wouter Sent from my iPhone. On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com wrote: Yes, it is easy to simply start a new factorization from the current model solution. It works well. That's more like incremental *batch* rebuilding of the model. That is not in MLlib but fairly trivial to add. You can certainly 'fold in' new data to approximately update with one new datum too, which you can find online. This is not quite the same idea as streaming SGD. I'm not sure this fits the RDD model well since it entails updating one element at a time but mini batch could be reasonable. On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com wrote: I was under the impression that ALS wasn't designed for it :- The famous ebay online recommender uses SGD However, you can try using the previous model as starting point, and gradually reduce the number of iteration after the model stablize. I never verify this idea, so you need to at least cross-validate it before putting into productio On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be wrote: Hi all, I'm curious about MLlib and if it is possible to do incremental training on the ALSModel. Usually training is run first, and then you can query. But in my case, data is collected in real-time and I want the predictions of my ALSModel to consider the latest data without complete re-training phase. I've checked out these resources, but could not find any info on how to solve this: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html My question fits in a larger picture where I'm using Prediction IO, and this in turn is based on Spark. Thanks in advance for any advice! Wouter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Driver behind NAT
Thanks for the link! However, from reviewing the thread, it appears you cannot have a NAT/firewall between the cluster and the spark-driver/shell..is this correct? When the shell starts up, it binds to the internal IP (e.g. 192.168.x.y)..not the external floating IP..which is routable from the cluster. When i did set a static port for the spark.driver.port and set the spark.driver.host to the floating IP address...I get the same exception, (Caused by: java.net.BindException: Cannot assign requested address: bind), because of the use of the InetAddress.getHostAddress method call. Cheers, Aaron On Mon, Jan 5, 2015 at 8:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can have a look at this discussion http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html Thanks Best Regards On Mon, Jan 5, 2015 at 6:11 PM, Aaron aarongm...@gmail.com wrote: Hello there, I was wondering if there is a way to have the spark-shell (or pyspark) sit behind a NAT when talking to the cluster? Basically, we have OpenStack instances that run with internal IPs, and we assign floating IPs as needed. Since the workers make direct TCP connections back, the spark-shell is binding to the internal IP..not the floating. Our other use case is running Vagrant VMs on our local machines..but, we don't have those VMs' NICs setup in bridged mode..it too has an internal IP. I tried using the SPARK_LOCAL_IP, and the various --conf spark.driver.host parameters...but it still get's angry. Any thoughts/suggestions? Currently our work around is to VPNC connection from inside the vagrant VMs or Openstack instances...but, that doesn't seem like a long term plan. Thanks in advance! Cheers, Aaron
Re: Spark Driver behind NAT
You can have a look at this discussion http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html Thanks Best Regards On Mon, Jan 5, 2015 at 6:11 PM, Aaron aarongm...@gmail.com wrote: Hello there, I was wondering if there is a way to have the spark-shell (or pyspark) sit behind a NAT when talking to the cluster? Basically, we have OpenStack instances that run with internal IPs, and we assign floating IPs as needed. Since the workers make direct TCP connections back, the spark-shell is binding to the internal IP..not the floating. Our other use case is running Vagrant VMs on our local machines..but, we don't have those VMs' NICs setup in bridged mode..it too has an internal IP. I tried using the SPARK_LOCAL_IP, and the various --conf spark.driver.host parameters...but it still get's angry. Any thoughts/suggestions? Currently our work around is to VPNC connection from inside the vagrant VMs or Openstack instances...but, that doesn't seem like a long term plan. Thanks in advance! Cheers, Aaron
Spark Driver behind NAT
Hello there, I was wondering if there is a way to have the spark-shell (or pyspark) sit behind a NAT when talking to the cluster? Basically, we have OpenStack instances that run with internal IPs, and we assign floating IPs as needed. Since the workers make direct TCP connections back, the spark-shell is binding to the internal IP..not the floating. Our other use case is running Vagrant VMs on our local machines..but, we don't have those VMs' NICs setup in bridged mode..it too has an internal IP. I tried using the SPARK_LOCAL_IP, and the various --conf spark.driver.host parameters...but it still get's angry. Any thoughts/suggestions? Currently our work around is to VPNC connection from inside the vagrant VMs or Openstack instances...but, that doesn't seem like a long term plan. Thanks in advance! Cheers, Aaron
Re: different akka versions and spark
I haven't tried it with spark specifically, but I've definitely run into problems trying to depend on multiple versions of akka in one project. On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com wrote: hey Ted, i am aware of the upgrade efforts for akka. however if spark 1.2 forces me to upgrade all our usage of akka to 2.3.x while spark 1.0 and 1.1 force me to use akka 2.2.x then we cannot build one application that runs on all spark 1.x versions, which i would consider a major incompatibility. best, koert On Sat, Jan 3, 2015 at 12:11 AM, Ted Yu yuzhih...@gmail.com wrote: Please see http://akka.io/news/2014/05/22/akka-2.3.3-released.html which points to http://doc.akka.io/docs/akka/2.3.3/project/migration-guide-2.2.x-2.3.x.html?_ga=1.35212129.1385865413.1420220234 Cheers On Fri, Jan 2, 2015 at 9:11 AM, Koert Kuipers ko...@tresata.com wrote: i noticed spark 1.2.0 bumps the akka version. since spark uses it's own akka version, does this mean it can co-exist with another akka version in the same JVM? has anyone tried this? we have some spark apps that also use akka (2.2.3) and spray. if different akka versions causes conflicts then spark 1.2.0 would not be backwards compatible for us... thanks. koert
Re: Finding most occurrences in a JSON Nested Array
If you need more help let me know -Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20976.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
try as below results.map(row = row(1)).collect try var hobbies = results.flatMap(row = row(1)) It will create all the hobbies in a simpe array nowob hbmap =hobbies.map(hobby =(hobby,1)).reduceByKey((hobcnt1,hobcnt2) =hobcnt1+hobcnt2) It will aggregate hobbies as below {swimming,2}, {hiking,1} Now hbmap .map{case(hobby,count)=(count,hobby)}.sortByKey(ascending =false).collect will give you hobbies sorted in descending by their count This is pseudo code and must help you Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20975.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Add PredictionIO to Powered by Spark
Please can we add PredictionIO to https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark PredictionIO http://prediction.io/ PredictionIO is an open source machine learning server for software developers to easily build and deploy predictive applications on production. PredictionIO currently offers two engine templates for Apache Spark MLlib for recommendation (MLlib ALS) and classification (MLlib Naive Bayes). With these templates, you can create a custom predictive engine for production deployment efficiently. A standard PredictionIO stack is built on top of solid open source technology, such as Scala, Apache Spark, HBase and Elasticsearch. We are already featured on https://databricks.com/certified-on-spark Kind regards and Happy New Year! Thomas — This page tracks the users of Spark. To add yourself to the list, please email user@spark.apache.org with your organization name, URL, a list of which Spark components you are using, and a short description of your use case.
Re: different akka versions and spark
since spark shaded akka i wonder if it would work, but i doubt it On Mon, Jan 5, 2015 at 9:56 AM, Cody Koeninger c...@koeninger.org wrote: I haven't tried it with spark specifically, but I've definitely run into problems trying to depend on multiple versions of akka in one project. On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com wrote: hey Ted, i am aware of the upgrade efforts for akka. however if spark 1.2 forces me to upgrade all our usage of akka to 2.3.x while spark 1.0 and 1.1 force me to use akka 2.2.x then we cannot build one application that runs on all spark 1.x versions, which i would consider a major incompatibility. best, koert On Sat, Jan 3, 2015 at 12:11 AM, Ted Yu yuzhih...@gmail.com wrote: Please see http://akka.io/news/2014/05/22/akka-2.3.3-released.html which points to http://doc.akka.io/docs/akka/2.3.3/project/migration-guide-2.2.x-2.3.x.html?_ga=1.35212129.1385865413.1420220234 Cheers On Fri, Jan 2, 2015 at 9:11 AM, Koert Kuipers ko...@tresata.com wrote: i noticed spark 1.2.0 bumps the akka version. since spark uses it's own akka version, does this mean it can co-exist with another akka version in the same JVM? has anyone tried this? we have some spark apps that also use akka (2.2.3) and spray. if different akka versions causes conflicts then spark 1.2.0 would not be backwards compatible for us... thanks. koert
Re: Finding most occurrences in a JSON Nested Array
I did try this earlier before, but I’ve got an error that I couldn’t comprehend: scala var hobbies = results.flatMap(row = row(1)) console:16: error: type mismatch; found : Any required: TraversableOnce[?] var hobbies = results.flatMap(row = row(1)) I must be missing something, perhaps a cast. On 6 Jan, 2015, at 12:17 am, Pankaj Narang [via Apache Spark User List] ml-node+s1001560n2097...@n3.nabble.com wrote: try as below results.map(row = row(1)).collect try var hobbies = results.flatMap(row = row(1)) It will create all the hobbies in a simpe array nowob hbmap =hobbies.map(hobby =(hobby,1)).reduceByKey((hobcnt1,hobcnt2) =hobcnt1+hobcnt2) It will aggregate hobbies as below {swimming,2}, {hiking,1} Now hbmap .map{case(hobby,count)=(count,hobby)}.sortByKey(ascending =false).collect will give you hobbies sorted in descending by their count This is pseudo code and must help you Regards Pankaj If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20975.html To unsubscribe from Finding most occurrences in a JSON Nested Array, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20977.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Saving partial (top 10) DStream windows to hdfs
Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) I can print top 10 as above in red. I have also tried sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} but I get the following error. 15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Regards,Laeeq
RE: python API for gradient boosting?
Awesome, thanks for creating this -- I wasn't sure of the process for requesting such a thing. I looked at what is required based on the scala code, and it seemed a bit beyond my capability. cheers chris -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Tuesday, 6 January 2015 8:43 AM To: Christopher Thom Cc: user@spark.apache.org Subject: Re: python API for gradient boosting? I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-5094. Hopefully someone would work on it and make it available in the 1.3 release. -Xiangrui On Sun, Jan 4, 2015 at 6:58 PM, Christopher Thom christopher.t...@quantium.com.au wrote: Hi, I wonder if anyone knows when a python API will be added for Gradient Boosted Trees? I see that java and scala APIs were added for the 1.2 release, and would love to be able to build GBMs in pyspark too. cheers chris Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.au linkedin.com/company/quantium facebook.com/QuantiumAustralia twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system. Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.auwww.quantium.com.au linkedin.com/company/quantiumwww.linkedin.com/company/quantium facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia twitter.com/QuantiumAUwww.twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system.
Re: MLLIB and Openblas library in non-default dir
It might be hard to do that with spark-submit, because the executor JVMs may be already up and running before a user runs spark-submit. You can try to use `System.setProperty` to change the property at runtime, though it doesn't seem to be a good solution. -Xiangrui On Fri, Jan 2, 2015 at 6:28 AM, xhudik xhu...@gmail.com wrote: Hi I have compiled OpenBlas library into nonstandard directory and I want to inform Spark app about it via: -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so which is a standard option in netlib-java (https://github.com/fommil/netlib-java) I tried 2 ways: 1. via *--conf* parameter /bin/spark-submit -v --class org.apache.spark.examples.mllib.LinearRegression *--conf -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so* examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop1.0.4.jar data/mllib/sample_libsvm_data.txt/ 2. via *--driver-java-options* parameter /bin/spark-submit -v *--driver-java-options -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so* --class org.apache.spark.examples.mllib.LinearRegression examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop1.0.4.jar data/mllib/sample_libsvm_data.txt / How can I force spark-submit to propagate info about non-standard placement of openblas library to netlib-java lib? thanks, Tomas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-and-Openblas-library-in-non-default-dir-tp20943.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: python API for gradient boosting?
I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-5094. Hopefully someone would work on it and make it available in the 1.3 release. -Xiangrui On Sun, Jan 4, 2015 at 6:58 PM, Christopher Thom christopher.t...@quantium.com.au wrote: Hi, I wonder if anyone knows when a python API will be added for Gradient Boosted Trees? I see that java and scala APIs were added for the 1.2 release, and would love to be able to build GBMs in pyspark too. cheers chris Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.au linkedin.com/company/quantium facebook.com/QuantiumAustralia twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Driver hangs on running mllib word2vec
How big is your dataset, and what is the vocabulary size? -Xiangrui On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi, When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup usage. Here is the jstack output: main prio=10 tid=0x40112800 nid=0x46f2 runnable [0x4162e000] java.lang.Thread.State: RUNNABLE at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) at java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) at org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) at com.baidu.inf.WordCount$.main(WordCount.scala:31) at com.baidu.inf.WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Best Regards - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark response times for queries seem slow
I am running pyspark job over 4GB of data that is split into 17 parquet files on HDFS cluster. This is all in cloudera manager. Here is the query the job is running : parquetFile.registerTempTable(parquetFileone) results = sqlContext.sql(SELECT sum(total_impressions), sum(total_clicks) FROM parquetFileone group by hour) I also ran this way : mapped = parquetFile.map(lambda row: (str(row.hour), (row.total_impressions, row.total_clicks))) counts = mapped.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) my results where anywhere from 8 - 10 minutes. I am wondering if there is a configuration that needs to be tweaked or if this is expected response time. Machines are 30g RAM and 4 cores. Seems the CPU's are just getting pegged and that is what is taking so long. Any help on this would be amazing. Thanks, -- *MAGNE**+**I**C* *Sam Flint* | *Lead Developer, Data Analytics*
Re: different akka versions and spark
Spark doesn't really shade akka; it pulls a different build (kept under the org.spark-project.akka group and, I assume, with some build-time differences from upstream akka?), but all classes are still in the original location. The upgrade is a little more unfortunate than just changing akka, since it also changes some transitive dependencies which also have compatibility issues (e.g. the typesafe config library). But I believe it's needed to support Scala 2.11... On Mon, Jan 5, 2015 at 8:27 AM, Koert Kuipers ko...@tresata.com wrote: since spark shaded akka i wonder if it would work, but i doubt it On Mon, Jan 5, 2015 at 9:56 AM, Cody Koeninger c...@koeninger.org wrote: I haven't tried it with spark specifically, but I've definitely run into problems trying to depend on multiple versions of akka in one project. On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com wrote: hey Ted, i am aware of the upgrade efforts for akka. however if spark 1.2 forces me to upgrade all our usage of akka to 2.3.x while spark 1.0 and 1.1 force me to use akka 2.2.x then we cannot build one application that runs on all spark 1.x versions, which i would consider a major incompatibility. best, koert On Sat, Jan 3, 2015 at 12:11 AM, Ted Yu yuzhih...@gmail.com wrote: Please see http://akka.io/news/2014/05/22/akka-2.3.3-released.html which points to http://doc.akka.io/docs/akka/2.3.3/project/migration-guide-2.2.x-2.3.x.html?_ga=1.35212129.1385865413.1420220234 Cheers On Fri, Jan 2, 2015 at 9:11 AM, Koert Kuipers ko...@tresata.com wrote: i noticed spark 1.2.0 bumps the akka version. since spark uses it's own akka version, does this mean it can co-exist with another akka version in the same JVM? has anyone tried this? we have some spark apps that also use akka (2.2.3) and spray. if different akka versions causes conflicts then spark 1.2.0 would not be backwards compatible for us... thanks. koert -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle Problems in 1.2.0
Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s Traceback (most recent call last): File /home/hadoop/test_scripts/test.py, line 63, in module result = j.collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 676, in collect bytesInJava = self._jrdd.collect().iterator() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Stopped : An error occurred while calling o117.collect. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
Re: Reading from a centralized stored
If you are not co-locating spark executor processes on the same machines where the data is stored, and using an rdd that knows about which node to prefer scheduling a task on, yes, the data will be pulled over the network. Of the options you listed, S3 and DynamoDB cannot have spark running on the same machines. Cassandra can be run on the same nodes as spark, and recent versions of the spark cassandra connector implement preferred locations. You can run an rdbms on the same nodes as spark, but JdbcRDD doesn't implement preferred locations. On Mon, Jan 5, 2015 at 6:25 PM, Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm trying to understand how a Spark Cluster behaves when the data it is processing resides on a centralized/remote store (S3, Cassandra, DynamoDB, RDBMS etc). Does every node in the cluster retrieve all the data from the central store ? thanks -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: Reading from a centralized stored
Thanks, that's what I suspected. cheers On Tue, Jan 6, 2015 at 12:56 PM, Cody Koeninger c...@koeninger.org wrote: If you are not co-locating spark executor processes on the same machines where the data is stored, and using an rdd that knows about which node to prefer scheduling a task on, yes, the data will be pulled over the network. Of the options you listed, S3 and DynamoDB cannot have spark running on the same machines. Cassandra can be run on the same nodes as spark, and recent versions of the spark cassandra connector implement preferred locations. You can run an rdbms on the same nodes as spark, but JdbcRDD doesn't implement preferred locations. On Mon, Jan 5, 2015 at 6:25 PM, Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm trying to understand how a Spark Cluster behaves when the data it is processing resides on a centralized/remote store (S3, Cassandra, DynamoDB, RDBMS etc). Does every node in the cluster retrieve all the data from the central store ? thanks -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: Spark response times for queries seem slow
That sounds slow to me. It looks like your sql query is grouping by a column that isn't in the projections, I'm a little surprised that even works. But you're getting the same time reducing manually? Have you looked at the shuffle amounts in the UI for the job? Are you certain there aren't a disproportionate number of rows with the same hour (e.g. null hour)? On Mon, Jan 5, 2015 at 5:20 PM, Sam Flint sam.fl...@magnetic.com wrote: I am running pyspark job over 4GB of data that is split into 17 parquet files on HDFS cluster. This is all in cloudera manager. Here is the query the job is running : parquetFile.registerTempTable(parquetFileone) results = sqlContext.sql(SELECT sum(total_impressions), sum(total_clicks) FROM parquetFileone group by hour) I also ran this way : mapped = parquetFile.map(lambda row: (str(row.hour), (row.total_impressions, row.total_clicks))) counts = mapped.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) my results where anywhere from 8 - 10 minutes. I am wondering if there is a configuration that needs to be tweaked or if this is expected response time. Machines are 30g RAM and 4 cores. Seems the CPU's are just getting pegged and that is what is taking so long. Any help on this would be amazing. Thanks, -- *MAGNE**+**I**C* *Sam Flint* | *Lead Developer, Data Analytics*
Re: SparkSQL support for reading Avro files
Did you follow the link on that page? THIS REPO HAS BEEN MOVED https://github.com/marmbrus/sql-avro#please-go-to-the-version-hosted-by-databricksPlease go to the version hosted by databricks https://github.com/databricks/spark-avro On Mon, Jan 5, 2015 at 1:12 PM, yanenli2 yane...@gmail.com wrote: Hi All, I want to use the SparkSQL to manipulate the data with Avro format. I found a solution at https://github.com/marmbrus/sql-avro . However it doesn't compile successfully anymore with the latent code of Spark version 1.2.0 or 1.2.1. I then try to pull a copy from github stated at http://mail-archives.apache.org/mod_mbox/spark-reviews/201409.mbox/%3cgit-pr-2475-sp...@git.apache.org%3E by command: git checkout 47d542cc0238fba04b6c4e4456393d812d559c4e But it failed to pull this commit. What can I do to make it work? Any plan for adding the Avro support in SparkSQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-support-for-reading-Avro-files-tp20981.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark for core business-logic? - Replacing: MongoDB?
Alec, If you are looking for a Machine Learning stack that supports business-logics, you may take a look at PredictionIO: http://prediction.io/ It's based on Spark and HBase. Simon On Mon, Jan 5, 2015 at 6:14 PM, Alec Taylor alec.tayl...@gmail.com wrote: Thanks all. To answer your clarification questions: - I'm writing this in Python - A similar problem to my actual one is to find common 30 minute slots (over the next 12 months) [r] that k users have in common. Total users: n. Given n=1 and r=17472 then the [naïve] time-complexity is $\mathcal{O}(nr)$. n*r=17,472,000. I may be able to get $\mathcal{O}(n \log r)$ if not $\log \log$ from reading the literature on sequence matching, however this is uncertain. So assuming all the other business-logic which needs to be built in, such as authentication and various other CRUD operations, as well as this more intensive sequence searching operation, what stack would be best for me? Thanks for all suggestions On Mon, Jan 5, 2015 at 4:24 PM, Jörn Franke jornfra...@gmail.com wrote: Hallo, It really depends on your requirements, what kind of machine learning algorithm your budget, if you do currently something really new or integrate it with an existing application, etc.. You can run MongoDB as well as a cluster. I don't think this question can be answered generally, but depends on details of your case. Best regards Le 4 janv. 2015 01:44, Alec Taylor alec.tayl...@gmail.com a écrit : In the middle of doing the architecture for a new project, which has various machine learning and related components, including: recommender systems, search engines and sequence [common intersection] matching. Usually I use: MongoDB (as db), Redis (as cache) and celery (as queue, backed by Redis). Though I don't have experience with Hadoop, I was thinking of using Hadoop for the machine-learning (as this will become a Big Data problem quite quickly). To push the data into Hadoop, I would use a connector of some description, or push the MongoDB backups into HDFS at set intervals. However I was thinking that it might be better to put the whole thing in Hadoop, store all persistent data in Hadoop, and maybe do all the layers in Apache Spark (with caching remaining in Redis). Is that a viable option? - Most of what I see discusses Spark (and Hadoop in general) for analytics only. Apache Phoenix exposes a nice interface for read/write over HBase, so I might use that if Spark ends up being the wrong solution. Thanks for all suggestions, Alec Taylor PS: I need this for both Big and Small data. Note that I am using the Cloudera definition of Big Data referring to processing/storage across more than 1 machine. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet predicate pushdown
Hi all, I have a question regarding predicate pushdown for Parquet. My understanding was this would use the metadata in Parquet's blocks/pages to skip entire chunks that won't match without needing to decode the values and filter on every value in the table. I was testing a scenario where I had 100M rows in a Parquet file. Summing over a column took about 2-3 seconds. I also have a column (e.g. customer ID) with approximately 100 unique values. My assumption, though not exactly linear, would be that filtering on this would reduce the query time significantly due to it skipping entire segments based on the metadata. In fact, it took much longer - somewhere in the vicinity of 4-5 seconds, which suggested to me it's reading all the values for the key column (100M values), then filtering, then reading all the relevant segments/values for the measure column, hence the increase in time. In the logs, I could see it was successfully pushing down a Parquet predicate, so I'm not sure I'm understanding why this is taking longer. Could anyone shed some light on this or point out where I'm going wrong? Thanks!
Reading from a centralized stored
Hi, I'm trying to understand how a Spark Cluster behaves when the data it is processing resides on a centralized/remote store (S3, Cassandra, DynamoDB, RDBMS etc). Does every node in the cluster retrieve all the data from the central store ? thanks -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Fwd: Controlling number of executors on Mesos vs YARN
Forgot to hit reply-all. -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Sun, Jan 4, 2015 at 10:46 PM Subject: Re: Controlling number of executors on Mesos vs YARN To: mvle m...@us.ibm.com Hi Mike, You're correct there is no such setting in for Mesos coarse grain mode, since the assumption is that each node is launched with one container and Spark is launching multiple tasks in that container. In fine-grain mode there isn't a setting like that, as it currently will launch an executor as long as it satisfies the minimum container resource requirement. I've created a JIRA earlier about capping the number of executors or better distribute the # of executors launched in each node. Since the decision of choosing what node to launch containers is all in the Spark scheduler side, it's very easy to modify it. Btw, what's the configuration to set the # of executors on YARN side? Thanks, Tim On Sun, Jan 4, 2015 at 9:37 PM, mvle m...@us.ibm.com wrote: I'm trying to compare the performance of Spark running on Mesos vs YARN. However, I am having problems being able to configure the Spark workload to run in a similar way on Mesos and YARN. When running Spark on YARN, you can specify the number of executors per node. So if I have a node with 4 CPUs, I can specify 6 executors on that node. When running Spark on Mesos, there doesn't seem to be an equivalent way to specify this. In Mesos, you can somewhat force this by specifying the number of CPU resources to be 6 when running the slave daemon. However, this seems to be a static configuration of the Mesos cluster rather something that can be configured in the Spark framework. So here is my question: For Spark on Mesos, am I correct that there is no way to control the number of executors per node (assuming an idle cluster)? For Spark on Mesos coarse-grained mode, there is a way to specify max_cores but that is still not equivalent to specifying the number of executors per node as when Spark is run on YARN. If I am correct, then it seems Spark might be at a disadvantage running on Mesos compared to YARN (since it lacks the fine tuning ability provided by YARN). Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark for core business-logic? - Replacing: MongoDB?
Thanks all. To answer your clarification questions: - I'm writing this in Python - A similar problem to my actual one is to find common 30 minute slots (over the next 12 months) [r] that k users have in common. Total users: n. Given n=1 and r=17472 then the [naïve] time-complexity is $\mathcal{O}(nr)$. n*r=17,472,000. I may be able to get $\mathcal{O}(n \log r)$ if not $\log \log$ from reading the literature on sequence matching, however this is uncertain. So assuming all the other business-logic which needs to be built in, such as authentication and various other CRUD operations, as well as this more intensive sequence searching operation, what stack would be best for me? Thanks for all suggestions On Mon, Jan 5, 2015 at 4:24 PM, Jörn Franke jornfra...@gmail.com wrote: Hallo, It really depends on your requirements, what kind of machine learning algorithm your budget, if you do currently something really new or integrate it with an existing application, etc.. You can run MongoDB as well as a cluster. I don't think this question can be answered generally, but depends on details of your case. Best regards Le 4 janv. 2015 01:44, Alec Taylor alec.tayl...@gmail.com a écrit : In the middle of doing the architecture for a new project, which has various machine learning and related components, including: recommender systems, search engines and sequence [common intersection] matching. Usually I use: MongoDB (as db), Redis (as cache) and celery (as queue, backed by Redis). Though I don't have experience with Hadoop, I was thinking of using Hadoop for the machine-learning (as this will become a Big Data problem quite quickly). To push the data into Hadoop, I would use a connector of some description, or push the MongoDB backups into HDFS at set intervals. However I was thinking that it might be better to put the whole thing in Hadoop, store all persistent data in Hadoop, and maybe do all the layers in Apache Spark (with caching remaining in Redis). Is that a viable option? - Most of what I see discusses Spark (and Hadoop in general) for analytics only. Apache Phoenix exposes a nice interface for read/write over HBase, so I might use that if Spark ends up being the wrong solution. Thanks for all suggestions, Alec Taylor PS: I need this for both Big and Small data. Note that I am using the Cloudera definition of Big Data referring to processing/storage across more than 1 machine. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet predicate pushdown
Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration On Mon, Jan 5, 2015 at 3:38 PM, Adam Gilmore dragoncu...@gmail.com wrote: Hi all, I have a question regarding predicate pushdown for Parquet. My understanding was this would use the metadata in Parquet's blocks/pages to skip entire chunks that won't match without needing to decode the values and filter on every value in the table. I was testing a scenario where I had 100M rows in a Parquet file. Summing over a column took about 2-3 seconds. I also have a column (e.g. customer ID) with approximately 100 unique values. My assumption, though not exactly linear, would be that filtering on this would reduce the query time significantly due to it skipping entire segments based on the metadata. In fact, it took much longer - somewhere in the vicinity of 4-5 seconds, which suggested to me it's reading all the values for the key column (100M values), then filtering, then reading all the relevant segments/values for the measure column, hence the increase in time. In the logs, I could see it was successfully pushing down a Parquet predicate, so I'm not sure I'm understanding why this is taking longer. Could anyone shed some light on this or point out where I'm going wrong? Thanks!
Re: Mesos resource allocation
Hi Josh, I see, I haven't heard folks using larger JVM heap size than you mentioned (30gb), but in your scenario what you're proposing does make sense. I've created SPARK-5095 and we can continue our discussion about how to address this. Tim On Mon, Jan 5, 2015 at 1:22 AM, Josh Devins j...@soundcloud.com wrote: Hey Tim, sorry for the delayed reply, been on vacation for a couple weeks. The reason we want to control the number of executors is that running executors with JVM heaps over 30GB causes significant garbage collection problems. We have observed this through much trial-and-error for jobs that are a dozen-or-so stages, running for more than ~20m. For example, if we run 8 executors with 60GB heap each (for example, we have also other values larger than 30GB), even after much tuning of heap parameters (% for RDD cache, etc.) we run into GC problems. Effectively GC becomes so high that it takes over all compute time from the JVM. If we then halve the heap (30GB) but double the number of executors (16), all GC problems are relieved and we get to use the full memory resources of the cluster. We talked with some engineers from Databricks at Strata in Barcelona recently and received the same advice — do not run executors with more than 30GB heaps. Since our machines are 64GB machines and we are typically only running one or two jobs at a time on the cluster (for now), we can only use half the cluster memory with the current configuration options available in Mesos. Happy to hear your thoughts and actually very curious about how others are running Spark on Mesos with large heaps (as a result of large memory machines). Perhaps this is a non-issue when we have more multi-tenancy in the cluster, but for now, this is not the case. Thanks, Josh On 24 December 2014 at 06:22, Tim Chen t...@mesosphere.io wrote: Hi Josh, If you want to cap the amount of memory per executor in Coarse grain mode, then yes you only get 240GB of memory as you mentioned. What's the reason you don't want to raise the capacity of memory you use per executor? In coarse grain mode the Spark executor is long living and it internally will get tasks distributed by Spark internal Coarse grained scheduler. I think the assumption is that it already allocated the maximum available on that slave and don't really assume we need another one. I think it's worth considering having a configuration of number of cores per executor, especially when Mesos have inverse offers and optimistic offers so we can choose to launch more executors when resources becomes available even in coarse grain mode and then support giving the executors back but more higher priority tasks arrive. For fine grain mode, the spark executors are started by Mesos executors that is configured from Mesos scheduler backend. I believe the RDD is cached as long as the Mesos executor is running as the BlockManager is created on executor registration. Let me know if you need any more info. Tim -- Forwarded message -- From: Josh Devins j...@soundcloud.com Date: 22 December 2014 at 17:23 Subject: Mesos resource allocation To: user@spark.apache.org We are experimenting with running Spark on Mesos after running successfully in Standalone mode for a few months. With the Standalone resource manager (as well as YARN), you have the option to define the number of cores, number of executors and memory per executor. In Mesos, however, it appears as though you cannot specify the number of executors, even in coarse-grained mode. If this is the case, how do you define the number of executors to run with? Here's an example of why this matters (to us). Let's say we have the following cluster: num nodes: 8 num cores: 256 (32 per node) total memory: 512GB (64GB per node) If I set my job to require 256 cores and per-executor-memory to 30GB, then Mesos will schedule a single executor per machine (8 executors total) and each executor will get 32 cores to work with. This means that we have 8 executors * 32GB each for a total of 240G of cluster memory in use — less than half of what is available. If you want actually 16 executors in order to increase the amount of memory in use across the cluster, how can you do this with Mesos? It seems that a parameter is missing (or I haven't found it yet) which lets me tune this for Mesos: * number of executors per n-cores OR * number of executors total Furthermore, in fine-grained mode in Mesos, how are the executors started/allocated? That is, since Spark tasks map to Mesos tasks, when and how are executors started? If they are transient and an executor per task is created, does this mean we cannot have cached RDDs? Thanks for any advice or pointers, Josh
Re: Spark for core business-logic? - Replacing: MongoDB?
Thanks Simon, that's a good way to train on incoming events (and related problems / and result computations). However, does it handle the actual data storage? - E.g.: CRUD documents On Tue, Jan 6, 2015 at 1:18 PM, Simon Chan simonc...@gmail.com wrote: Alec, If you are looking for a Machine Learning stack that supports business-logics, you may take a look at PredictionIO: http://prediction.io/ It's based on Spark and HBase. Simon On Mon, Jan 5, 2015 at 6:14 PM, Alec Taylor alec.tayl...@gmail.com wrote: Thanks all. To answer your clarification questions: - I'm writing this in Python - A similar problem to my actual one is to find common 30 minute slots (over the next 12 months) [r] that k users have in common. Total users: n. Given n=1 and r=17472 then the [naïve] time-complexity is $\mathcal{O}(nr)$. n*r=17,472,000. I may be able to get $\mathcal{O}(n \log r)$ if not $\log \log$ from reading the literature on sequence matching, however this is uncertain. So assuming all the other business-logic which needs to be built in, such as authentication and various other CRUD operations, as well as this more intensive sequence searching operation, what stack would be best for me? Thanks for all suggestions On Mon, Jan 5, 2015 at 4:24 PM, Jörn Franke jornfra...@gmail.com wrote: Hallo, It really depends on your requirements, what kind of machine learning algorithm your budget, if you do currently something really new or integrate it with an existing application, etc.. You can run MongoDB as well as a cluster. I don't think this question can be answered generally, but depends on details of your case. Best regards Le 4 janv. 2015 01:44, Alec Taylor alec.tayl...@gmail.com a écrit : In the middle of doing the architecture for a new project, which has various machine learning and related components, including: recommender systems, search engines and sequence [common intersection] matching. Usually I use: MongoDB (as db), Redis (as cache) and celery (as queue, backed by Redis). Though I don't have experience with Hadoop, I was thinking of using Hadoop for the machine-learning (as this will become a Big Data problem quite quickly). To push the data into Hadoop, I would use a connector of some description, or push the MongoDB backups into HDFS at set intervals. However I was thinking that it might be better to put the whole thing in Hadoop, store all persistent data in Hadoop, and maybe do all the layers in Apache Spark (with caching remaining in Redis). Is that a viable option? - Most of what I see discusses Spark (and Hadoop in general) for analytics only. Apache Phoenix exposes a nice interface for read/write over HBase, so I might use that if Spark ends up being the wrong solution. Thanks for all suggestions, Alec Taylor PS: I need this for both Big and Small data. Note that I am using the Cloudera definition of Big Data referring to processing/storage across more than 1 machine. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Add StructType column to SchemaRDD
Hi, I have a SchemaRDD where I want to add a column with a value that is computed from the rest of the row. As the computation involves a network operation and requires setup code, I can't use SELECT *, myUDF(*) FROM rdd, but I wanted to use a combination of: - get schema of input SchemaRDD - issue a mapPartitions call (including the setup code), obtaining a new RDD[Row] - extend the schema manually - create a new RDD by combining the RDD[Row] with the extended schema. This works very well, but I run into trouble querying that resulting SchemaRDD with SQL if: - the result of my computation is a case class - and I want to use values in this case class in the SQL query. In particular, while SELECT column FROM resultrdd works well, SELECT column.key_name FROM resultrdd gives a java.lang.ClassCastException: example.MyCaseClass cannot be cast to org.apache.spark.sql.catalyst.expressions.Row Here is an example to illustrate that: --- import org.apache.spark._import org.apache.spark.sql._import org.apache.spark.sql.catalyst.types._ val sc = new SparkContext(local[3], Test) val sqlc = new SQLContext(sc)import sqlc._ // this is the case class that my operation is returningcase class Result(string_values: Map[String, String], num_values: Map[String, Double])// dummy result dataval data = (Result(Map(team - a), Map(score - 0.8)) :: Result(Map(team - b), Map(score - 0.5)) :: Nil)val rdd = sc.parallelize(data)// simulate my computation by creating an RDD[Row] and creating// a schema programmaticallyval rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil))val progSchema = StructType(StructField(hello, IntegerType, false) :: StructField(newcol, rdd.schema, true) :: Nil)val progRdd = sqlc.applySchema(rowRdd, progSchema)progRdd.registerTempTable(progrdd)// the following call will *fail* with a ClassCastExceptionsqlc.sql(SELECT newcol.string_values['team'] FROM progrdd).foreach(println)// however, the schema I specified is correct. see how embedding// my result in a proper case class works:case class ResultContainer(hello: Int, newcol: Result)val caseClassRdd = rdd.map(dr = ResultContainer(7, dr))caseClassRdd.registerTempTable(caseclassrdd)// the following call will *work*sqlc.sql(SELECT newcol.string_values['team'] FROM caseclassrdd).foreach(println)// even though the schema for both RDDs is the same:progRdd.schema == caseClassRdd.schema --- It turns out that I cannot use the case class directly, but I have to convert it to a Row as well. That is, instead of val rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil)) I have to use val rowRdd = rdd.map(dr = Row.fromSeq(7 :: Row.fromSeq(dr.productIterator.toSeq) :: Nil)) and then, I can use SELECT newcol.string_values['team'] FROM progrdd So now I found that out and I'm happy that it works, but it was quite hard to track it down, so I was wondering if this is the most intuitive way to add a column to a SchemaRDD using mapPartitions (as opposed to using a UDF, where the conversion case class - Row seems to happen automatically). Or, even if there is no more intuitive way, just wanted to have this documented ;-) Thanks Tobias
Re: Finding most occurrences in a JSON Nested Array
yes row(1).collect would be wrong as it is not tranformation on RDD try getString(1) to fetch the RDD I already said this is the psuedo code. If it does not help let me know I will run the code and send you get/getAs should work for you for example var hashTagsList = popularHashTags.flatMap ( x = x.getAs[Seq[String]](0)) Even if you want I will take the remote of your machine to fix that Regards Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Driver hangs on running mllib word2vec
Hi Xiangrui, Our dataset is about 80GB(10B lines). In the driver's log, we foud this: *INFO Word2Vec: trainWordsCount = -1610413239* it seems that there is a integer overflow? On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng men...@gmail.com wrote: How big is your dataset, and what is the vocabulary size? -Xiangrui On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi, When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup usage. Here is the jstack output: main prio=10 tid=0x40112800 nid=0x46f2 runnable [0x4162e000] java.lang.Thread.State: RUNNABLE at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) at java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) at org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) at com.baidu.inf.WordCount$.main(WordCount.scala:31) at com.baidu.inf.WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Best Regards -- Best Regards
Re: JdbcRdd for Python
I'll add that there is a JDBC connector for the Spark SQL data sources API in the works, and this will work with python (though the standard SchemaRDD type conversions). On Mon, Jan 5, 2015 at 7:09 AM, Cody Koeninger c...@koeninger.org wrote: JavaDataBaseConnectivity is, as far as I know, JVM specific. The JdbcRDD is expecting to deal with Jdbc Connection and ResultSet objects. I haven't done any python development in over a decade, but if someone wants to work together on a python equivalent I'd be happy to help out. The original JdbcRDDimplementation only took a little bit of spare time across a couple of evenings, shouldn't be a big commitment. On Fri, Jan 2, 2015 at 3:22 PM, elliott cordo elliottco...@gmail.com wrote: yeah.. i went through the source, and unless i'm missing something it's not.. agreed, i'd love to see it implemented! On Fri, Jan 2, 2015 at 3:59 PM, Tim Schweichler tim.schweich...@healthination.com wrote: Doesn't look like it is at the moment. If that's the case I'd love to see it implemented. From: elliott cordo elliottco...@gmail.com Date: Friday, January 2, 2015 at 8:17 AM To: user@spark.apache.org user@spark.apache.org Subject: JdbcRdd for Python Hi All - Is JdbcRdd currently supported? Having trouble finding any info or examples?
Spark avro: Sample app fails to run in a spark standalone cluster
Hi, I have this simple spark app. public class AvroSparkTest { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf() .setMaster(spark://niranda-ThinkPad-T540p:7077) // (local[2]) .setAppName(avro-spark-test); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaSQLContext sqlContext = new JavaSQLContext(sparkContext); JavaSchemaRDD episodes = AvroUtils.avroFile(sqlContext, /home/niranda/projects/avro-spark-test/src/test/resources/episodes.avro); episodes.printSchema(); episodes.registerTempTable(avroTable); ListRow result = sqlContext.sql(SELECT * FROM avroTable).collect(); for (Row row : result) { System.out.println(row.toString()); } } } It works well with master being set to local. But when I set master to a local spark server with 2 local workers, it gives the following error. 15/01/06 10:00:55 INFO MemoryStore: ensureFreeSpace(177166) called with curMem=0, maxMem=2004174766 15/01/06 10:00:55 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.0 KB, free 1911.2 MB) 15/01/06 10:00:55 INFO MemoryStore: ensureFreeSpace(25502) called with curMem=177166, maxMem=2004174766 15/01/06 10:00:55 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.9 KB, free 1911.1 MB) 15/01/06 10:00:55 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.100.5.109:41873 (size: 24.9 KB, free: 1911.3 MB) 15/01/06 10:00:55 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/06 10:00:55 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:45 15/01/06 10:00:55 INFO FileInputFormat: Total input paths to process : 1 15/01/06 10:00:55 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/01/06 10:00:55 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:84) with 2 output partitions (allowLocal=false) 15/01/06 10:00:55 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:84) 15/01/06 10:00:55 INFO DAGScheduler: Parents of final stage: List() 15/01/06 10:00:55 INFO DAGScheduler: Missing parents: List() 15/01/06 10:00:55 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents 15/01/06 10:00:55 INFO MemoryStore: ensureFreeSpace(4864) called with curMem=202668, maxMem=2004174766 15/01/06 10:00:55 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 1911.1 MB) 15/01/06 10:00:55 INFO MemoryStore: ensureFreeSpace(3482) called with curMem=207532, maxMem=2004174766 15/01/06 10:00:55 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1911.1 MB) 15/01/06 10:00:55 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.100.5.109:41873 (size: 3.4 KB, free: 1911.3 MB) 15/01/06 10:00:55 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/01/06 10:00:55 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/01/06 10:00:55 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) 15/01/06 10:00:55 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/06 10:00:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/06 10:00:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/06 10:00:55 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.100.5.109): java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at
Re: Driver hangs on running mllib word2vec
I think it is overflow. The training data is quite big. The algorithms scalability highly depends on the vocabSize. Even without overflow, there are still other bottlenecks, for example, syn0Global and syn1Global, each of them has vocabSize * vectorSize elements. Thanks. Zhan Zhang On Jan 5, 2015, at 7:47 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi Xiangrui, Our dataset is about 80GB(10B lines). In the driver's log, we foud this: INFO Word2Vec: trainWordsCount = -1610413239 it seems that there is a integer overflow? On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng men...@gmail.com wrote: How big is your dataset, and what is the vocabulary size? -Xiangrui On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi, When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup usage. Here is the jstack output: main prio=10 tid=0x40112800 nid=0x46f2 runnable [0x4162e000] java.lang.Thread.State: RUNNABLE at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) at java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) at org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) at com.baidu.inf.WordCount$.main(WordCount.scala:31) at com.baidu.inf.WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Best Regards -- Best Regards -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: spark 1.2: value toJSON is not a member of org.apache.spark.sql.SchemaRDD
I think you are missing something: $ javap -cp ~/Downloads/spark-sql_2.10-1.2.0.jar org.apache.spark.sql.SchemaRDD|grep toJSON public org.apache.spark.rdd.RDDjava.lang.String toJSON(); On Mon, Jan 5, 2015 at 3:11 AM, bchazalet bchaza...@companywatch.net wrote: Hi everyone, I have just switched to spark 1.2.0 from 1.1.1, updating my sbt to point to the 1.2.0 jars. org.apache.spark %% spark-core % 1.2.0, org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-hive % 1.2.0, I was hoping to use the new SchemaRDD#toJSON among other features, but it seems that this method is not available in the 1.2.0 spark-sql jars. I did a 'javap SchemaRDD.class | grep toJSON' on the unarchived spark-sql_2.10-1.2.0.jar that I got from maven, but it's not there. The SchemaRDD#toJson is present in the source https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala within the branch-1.2 branch though. I am missing something? Or wasn't that feature shipped with 1.2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-value-toJSON-is-not-a-member-of-org-apache-spark-sql-SchemaRDD-tp20972.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom receiver runtime Kryo exception
Hello all, I am using Spark 1.0.2 and I have a custom receiver that works well. I tried adding Kryo serialization to SparkConf: val spark = new SparkConf() ….. .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) and I am getting a strange error that I am not sure how to solve: Exception in thread Thread-37 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put Serialization trace: object (com.typesafe.config.impl.SimpleConfig) atomFeedConf (com.twc.needle.cp.AtomFeedReceiver) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) …… Here is part of my custom receiver: class AtomFeedReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY_SER) { private val conf = ConfigFactory.load private val atomFeedConf = conf.getConfig(cp.spark.atomfeed) private val atomFeedUrl = atomFeedConf.getString(url) private val urlConnTimeout = atomFeedConf.getInt(url_conn_timeout) private val urlReadTimeout = atomFeedConf.getInt(url_read_timeout) private val username = atomFeedConf.getString(username) private val password = atomFeedConf.getString(password) private val keepFeedCriteria = atomFeedConf.getString(keep_feed_criteria) private val feedTrackerDir = atomFeedConf.getString(feed_tracker_dir) private val feedTrackerFileName = atomFeedConf.getString(feed_tracker_file_name) private val enableSampling = atomFeedConf.getBoolean(enable_sampling”) ….. Here is how I am calling the receiver in the main method: val logLineStreamRaw = ssc.receiverStream(new AtomFeedReceiver) Any idea why Spark needs the Config object to be mutable only when Kryo serialization is enabled? Thanks. This E-mail and any of its attachments may contain Time Warner Cable proprietary information, which is privileged, confidential, or subject to copyright belonging to Time Warner Cable. This E-mail is intended solely for the use of the individual or entity to which it is addressed. If you are not the intended recipient of this E-mail, you are hereby notified that any dissemination, distribution, copying, or action taken in relation to the contents of and attachments to this E-mail is strictly prohibited and may be unlawful. If you have received this E-mail in error, please notify the sender immediately and permanently delete the original and any copy of this E-mail and any printout.
Question on Spark UI/accumulators
Hi, The Spark documentation states that If accumulators are created with a name, they will be displayed in Spark’s UI http://spark.apache.org/docs/latest/programming-guide.html#accumulators Where exactly are they shown? I may be dense, but I can't find them on the UI from http://localhost:4040 :( FWIW, I'm using Spark 1.2.0, and I created the accumulators like this: val merges = sc.accumulator(0, Merges) val vertices = sc.accumulator(0,Vertices) Regards, Virgil.
python API for gradient boosting?
Hi, I wonder if anyone knows when a python API will be added for Gradient Boosted Trees? I see that java and scala APIs were added for the 1.2 release, and would love to be able to build GBMs in pyspark too. I find the convenience of being able to use numpy + scipy + matplotlib pretty compelling. As an alternative, if it'll be a while before this API is implemented, does anyone have suggestions for scala replacements for the above python libraries? cheers chris Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.auwww.quantium.com.au linkedin.com/company/quantiumwww.linkedin.com/company/quantium facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia twitter.com/QuantiumAUwww.twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system.
Re: FlatMapValues
cool let me adapt that. thanks a tonregardssanjay From: Sean Owen so...@cloudera.com To: Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, January 5, 2015 3:19 AM Subject: Re: FlatMapValues For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input = val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key = (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen so...@cloudera.com wrote: From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink
problem while running code
the log is here py4j.protocol.Py4JError: An error occurred while calling o22.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:695) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-while-running-code-tp20978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Timeout Exception in standalone cluster
Hi, I am getting following exception in Spark (1.1.0) Job that is running on Standalone Cluster. My cluster configuration is: Intel(R) 2.50GHz 4 Core 16 GB RAM 5 Machines. Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:113) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:156) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Timeout-Exception-in-standalone-cluster-tp20979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Api to get the status of spark workers
You can use 4040 port, that gives information for current running application. That will give detail summary of currently running executors. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Api-to-get-the-status-of-spark-workers-tp20967p20980.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: ApacheCon North America 2015 Call For Papers
FYI, ApacheCon North America call for papers is up. Matei Begin forwarded message: Date: January 5, 2015 at 9:40:41 AM PST From: Rich Bowen rbo...@rcbowen.com Reply-To: dev d...@community.apache.org To: dev d...@community.apache.org Subject: ApacheCon North America 2015 Call For Papers Fellow ASF enthusiasts, We now have less than a month remaining in the Call For Papers for ApacheCon North America 2015, and so far the submissions are on the paltry side. Please consider submitting papers for consideration for this event. Details about the event are available at http://events.linuxfoundation.org/events/apachecon-north-america The call for papers is at http://events.linuxfoundation.org//events/apachecon-north-america/program/cfp Please help us out by getting this message out to your user@ and dev@ community on the projects that you're involved in, so that these projects can be represented in Austin. If you are interested in chairing a content track, and taking on the task of wrangling your community together to create a compelling story about your technology space, please join the comdev mailing list - dev-subscr...@community.apache.org - and speak up there. (Message is Bcc'ed committers@, and Reply-to set to dev@community, if you want to discuss this topic further there.) Thanks! -- Rich Bowen - rbo...@rcbowen.com - @rbowen http://apachecon.com/ - @apachecon
Re: JdbcRdd for Python
JavaDataBaseConnectivity is, as far as I know, JVM specific. The JdbcRDD is expecting to deal with Jdbc Connection and ResultSet objects. I haven't done any python development in over a decade, but if someone wants to work together on a python equivalent I'd be happy to help out. The original JdbcRDDimplementation only took a little bit of spare time across a couple of evenings, shouldn't be a big commitment. On Fri, Jan 2, 2015 at 3:22 PM, elliott cordo elliottco...@gmail.com wrote: yeah.. i went through the source, and unless i'm missing something it's not.. agreed, i'd love to see it implemented! On Fri, Jan 2, 2015 at 3:59 PM, Tim Schweichler tim.schweich...@healthination.com wrote: Doesn't look like it is at the moment. If that's the case I'd love to see it implemented. From: elliott cordo elliottco...@gmail.com Date: Friday, January 2, 2015 at 8:17 AM To: user@spark.apache.org user@spark.apache.org Subject: JdbcRdd for Python Hi All - Is JdbcRdd currently supported? Having trouble finding any info or examples?
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: pyspark on yarn
Below is the code that I am running. I get an error for unresolved attributes. Can anyone point me in the right direction? Running from pyspark shell using yarn MASTER=yarn-client pyspark Error is below code: # Import SQLContext and data types from pyspark.sql import * # sc is an existing SparkContext. sqlContext = SQLContext(sc) # The result of loading a parquet file is also a SchemaRDD. # Try loading all data that you have parquetFile = sqlContext.parquetFile(/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.0.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.1.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.10.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.11.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.2.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.3.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.4.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.5.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.6.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.7.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.8.parq,/user/hive/warehouse/impala_new_4/key=20141001/69446344000a3a17-c90aac1f33a0fbc_875501925_data.9.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.0.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.1.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.2.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.3.parq,/user/hive/warehouse/impala_new_4/key=20141001/f1448ca083a5e224-159572f61b50d7a3_854675293_data.4.parq) # Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable(parquetFileone) results = sqlContext.sql(SELECT * FROM parquetFileone where key=20141001 ) #print results for result in results.collect(): print result Traceback (most recent call last): File stdin, line 1, in module File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/sql.py, line 1615, in collect rows = RDD.collect(self) File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/rdd.py, line 678, in collect bytesInJava = self._jrdd.collect().iterator() File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/pyspark/sql.py, line 1527, in _jrdd self._lazy_jrdd = self._jschema_rdd.javaToPython() File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o29.javaToPython. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Filter ('key = 20141001) Subquery parquetFileone ParquetRelation
SparkSQL support for reading Avro files
Hi All, I want to use the SparkSQL to manipulate the data with Avro format. I found a solution at https://github.com/marmbrus/sql-avro . However it doesn't compile successfully anymore with the latent code of Spark version 1.2.0 or 1.2.1. I then try to pull a copy from github stated at http://mail-archives.apache.org/mod_mbox/spark-reviews/201409.mbox/%3cgit-pr-2475-sp...@git.apache.org%3E by command: git checkout 47d542cc0238fba04b6c4e4456393d812d559c4e But it failed to pull this commit. What can I do to make it work? Any plan for adding the Avro support in SparkSQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-support-for-reading-Avro-files-tp20981.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org