RE: SparkSQL can not use SchemaRDD from Hive
In your code snippet, sample is actually a SchemaRDD, and SchemaRDD actually binds a certain SQLContext in runtime, I don't think we can manipulate/share the SchemaRDD across SQLContext Instances. -Original Message- From: Kevin Jung [mailto:itsjb.j...@samsung.com] Sent: Tuesday, July 29, 2014 1:47 PM To: u...@spark.incubator.apache.org Subject: SparkSQL can not use SchemaRDD from Hive Hi I got a error message while using Hive and SparkSQL. This is code snippet I used. (in spark-shell , 1.0.0) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val hive = new org.apache.spark.sql.hive.HiveContext(sc) var sample = hive.hql(select * from sample10) // This creates SchemaRDD. I have table 'sample10' in hive. var countHive = sample.count() // It works sqlContext.registerRDDAsTable(sample,temp) sqlContext.sql(select * from temp).count() // It gives me a error message java.lang.RuntimeException: Table Not Found: sample10 I don't know why this happen. Does SparkSQL conflict with Hive? Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Joining spark user group
Re: SparkSQL can not use SchemaRDD from Hive
As Hao already mentioned, using 'hive' (the HiveContext) throughout would work. On Monday, July 28, 2014, Cheng, Hao hao.ch...@intel.com wrote: In your code snippet, sample is actually a SchemaRDD, and SchemaRDD actually binds a certain SQLContext in runtime, I don't think we can manipulate/share the SchemaRDD across SQLContext Instances. -Original Message- From: Kevin Jung [mailto:itsjb.j...@samsung.com javascript:;] Sent: Tuesday, July 29, 2014 1:47 PM To: u...@spark.incubator.apache.org javascript:; Subject: SparkSQL can not use SchemaRDD from Hive Hi I got a error message while using Hive and SparkSQL. This is code snippet I used. (in spark-shell , 1.0.0) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val hive = new org.apache.spark.sql.hive.HiveContext(sc) var sample = hive.hql(select * from sample10) // This creates SchemaRDD. I have table 'sample10' in hive. var countHive = sample.count() // It works sqlContext.registerRDDAsTable(sample,temp) sqlContext.sql(select * from temp).count() // It gives me a error message java.lang.RuntimeException: Table Not Found: sample10 I don't know why this happen. Does SparkSQL conflict with Hive? Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How true is this about spark streaming?
I'm not sure I understand this, maybe because the context is missing. An RDD is immutable, so there is no such thing as writing to an RDD. I'm not sure which aspect is being referred to as single-threaded. Is this the Spark Streaming driver? What is the difference between streaming into Spark and reading from the stream? Streaming data into Spark means Spark reads the stream. A mini batch of data is exposed as an RDD, but the stream processing continues while it is operated on. Saving the RDDs is one of the most basic operations exposed by streaming: http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations No, you do not stop the stream processing to persist it. In fact you couldn't. On that basis, no, this sounds fairly wrong. On Tue, Jul 29, 2014 at 1:37 AM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello folks: I came across a thread that said A Spark RDD read/write access is driven by a context object and is single threaded. You cannot stream into Spark and read from the stream at the same time. You have to stop the stream processing, snapshot the RDD and continue Can you please offer some insights? Thanks, Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
SPARK OWLQN Exception: Iteration Stage is so slow
Hi all, There is a problem we can’t resolve. We implement the OWLQN algorithm in parallel with SPARK, We don’t know why It is very slow in every iteration stage, but the load of CPU and Memory of each executor are so low that it seems impossible to make the the every step slow. And there are so many info log from stdout like this: BlockManagerMasterActor$BlockManagerInfo: Removed taskresult_xxx on SHXJ-Hx-HBxxx:44126 in memory Thank you. John Wu 晶赞广告(上海)有限公司 Zamplus Advertising (Shanghai) Co., Ltd. Tel: +8621-6076 0818 Ext. 885 Fax: +8621-6076 0812 Mobile: +86-13817415695 Room 1105, Tower 3, No. 695, Lingshi Rd., Shanghai 200072, P. R. China 上海市灵石路695号珠江创业园区3号楼1105室 Zamplus 免责声明:本邮件及其附件可能含有机密信息并受法律保护。如果您错误地收 到此邮 件,请不要转发给任何人,请立即将此邮件所有的副本、及其全部附件彻底删除,并请 告知发件人此邮件被发至错误的收件人。发送人在本邮件 下表达的观点并不一定代表 晶赞广告(上海)有限公司与UMA优质受众营销联盟的观点。晶赞广告(上海)有限公 司与UMA优质受众营销联盟并不 保证本邮件是安全或不受任何计算机病毒影响的,并且 对由于邮件传输而导致的邮件内容错误或缺失不承担任何责任。除非明确说明,本邮件 并不构成 具有约束力的契约。 ---
Re: UpdatestateByKey assumptions
Just an update on this, Looking into Spark logs seems that some partitions are not found and recomputed. Gives the impression that those are related with the delayed updatestatebykey calls. I'm seeing something like: log line 1 - Partition rdd_132_1 not found, computing it log line N - Found block rdd_132_1 locally Log line N+1 - Goes into the updatestatebykey X times has many objects with delayed update Log line M - Done Checkpointing RDD 126 to hdfs:// This happens for Y amount of partitions as many seconds the updatestatebykey call is delayed. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdatestateByKey-assumptions-tp10858p10859.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Avro Schema + GenericRecord to HadoopRDD
Hi all, I can read in Avro files to Spark with HadoopRDD and submit the schema in the jobConf, but with the guidance I've seen so far, I'm left with a avro GenericRecord of Java objects without type. How do I actually use the schema to have the types inferred? Example: scala AvroJob.setInputSchema(jobConf,schema); scala val rdd = sc.hadoopRDD(jobConf,classOf[org.apache.avro.mapred.AvroInputFormat[Generic Record]],classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],classOf [org.apache.hadoop.io.NullWritable],10) 14/07/29 09:27:49 INFO storage.MemoryStore: ensureFreeSpace(134254) called with curMem=0, maxMem=308713881 14/07/29 09:27:49 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 131.1 KB, free 294.3 MB) rdd: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avr o.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = HadoopRDD[0] at hadoopRDD at console:50 scala rdd.first._1.datum.get(amt) 14/07/29 09:31:34 INFO spark.SparkContext: Starting job: first at console:53 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Got job 3 (first at console:53) with 1 output partitions (allowLocal=true) 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Final stage: Stage 3(first at console:53) 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Missing parents: List() 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/07/29 09:31:34 INFO rdd.HadoopRDD: Input split: hdfs://nameservice1:8020/user/nylab/prod/persistent_tables/creditsetl_ref_e txns/201201/part-0.avro:0+34279385 14/07/29 09:31:34 INFO spark.SparkContext: Job finished: first at console:53, took 0.061220615 s res11: Object = 24.0 Thanks! Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Unit Testing (JUnit) with Spark
Is there any example out there for unit testing a Spark application in Java? Even a trivial application like word count will be very helpful. I am very new to this and I am struggling to understand how I can use JavaSpark Context for JUnit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Job using Spark for Machine Learning
I'm not sure if job adverts are allowed on here - please let me know if not. Otherwise, if you're interested in using Spark in an RD machine learning project then please get in touch. We are a startup based in London. Our data sets are on a massive scale- we collect data on over a billion users per month and are second only to Google in the contextual advertising space (ok - a distant second!). Details here: *http://grnh.se/rl8f25 http://grnh.se/rl8f25* -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: Unit Testing (JUnit) with Spark
I've been working some on building spark blueprints, and recently tried to generalize one for easy blueprints of spark apps. https://github.com/jayunit100/SparkBlueprint.git It runs the spark app's main method in a unit test, and builds in SBT. You can easily try it out and improve on it. Obviously, calling a main method is the wrong kind of coupling for a unit test, but it works pretty good in a simple CI environment. I'll improve it eventually by injecting the SparkContext and validating the RDD directly, in a next iteration. Pull requests welcome :) On Tue, Jul 29, 2014 at 11:29 AM, soumick86 sdasgu...@dstsystems.com wrote: Is there any example out there for unit testing a Spark application in Java? Even a trivial application like word count will be very helpful. I am very new to this and I am struggling to understand how I can use JavaSpark Context for JUnit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- jay vyas
Re: SPARK OWLQN Exception: Iteration Stage is so slow
Do you mind sharing more details, for example, specs of nodes and data size? -Xiangrui 2014-07-29 2:51 GMT-07:00 John Wu j...@zamplus.com: Hi all, There is a problem we can’t resolve. We implement the OWLQN algorithm in parallel with SPARK, We don’t know why It is very slow in every iteration stage, but the load of CPU and Memory of each executor are so low that it seems impossible to make the the every step slow. And there are so many info log from stdout like this: BlockManagerMasterActor$BlockManagerInfo: Removed taskresult_xxx on SHXJ-Hx-HBxxx:44126 in memory Thank you. John Wu 晶赞广告(上海)有限公司 Zamplus Advertising (Shanghai) Co., Ltd. Tel: +8621-6076 0818 Ext. 885 Fax: +8621-6076 0812 Mobile: +86-13817415695 Room 1105, Tower 3, No. 695, Lingshi Rd., Shanghai 200072, P. R. China 上海市灵石路695号珠江创业园区3号楼1105室 Zamplus 免责声明:本邮件及其附件可能含有机密信息并受法律保护。如果您错误地收 到此邮件,请不要转发给任何人,请立即将此邮件所有的副本、及其全部附件彻底删除,并请告知发件人此邮件被发至错误的收件人。发送人在本邮件 下表达的观点并不一定代表晶赞广告(上海)有限公司与UMA优质受众营销联盟的观点。晶赞广告(上海)有限公司与UMA优质受众营销联盟并不 保证本邮件是安全或不受任何计算机病毒影响的,并且对由于邮件传输而导致的邮件内容错误或缺失不承担任何责任。除非明确说明,本邮件并不构成 具有约束力的契约。 ---
the pregel operator of graphx throws NullPointerException
Hi, I'm running a spark standalone cluster to calculate single source shortest path. Here is the code, VertexRDD[(String, Long)], String for the path and Long for the distance codes before these lines related to reading graph data from file and building the graph. 71 val sssp = initialGraph.pregel((, Long.MaxValue)) ( 72 (id, mine, rece) = { 73 if(mine._2 rece._2) 74 rece 75 else 76 mine}, 77 triplet = { 78 if(triplet.srcAttr._2 + triplet.attr.toLong triplet.dstAttr._2) { 79 Iterator((triplet.dstId, (triplet.srcAttr._1 + + triplet.srcId.toString, triplet.srcAttr._2 + triplet.attr))) 80 } else { 81 Iterator.empty 82 } 83 }, 84 (a, b) = { 85 if(a._2 b._2) 86 a 87 else 88 b} 89 ) it will throw an exception and terminate: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:4 failed 4 times, most recent failure: Exception failure in TID 598 on host worker6.local: java.lang.NullPointerException [error] scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [error] scala.collection.Iterator$class.foreach(Iterator.scala:727) [error] scala.collection.AbstractIterator.foreach(Iterator.scala:1157) [error] scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) [error] scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) [error] org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:227) [error] org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:87) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) [error] org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) [error] org.apache.spark.scheduler.Task.run(Task.scala:51) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:744) [error] Driver stacktrace: 14/07/29 15:46:12 WARN scheduler.TaskSetManager: Task 574 was killed. org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:4 failed 4 times, most recent failure: Exception failure in TID 598 on host worker6.local: java.lang.NullPointerException scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at
Re: Unit Testing (JUnit) with Spark
Hi, try this one http://simpletoad.blogspot.com/2014/07/runing-spark-unit-test-on-windows-7.html it’s more about fixing windows-specific issue, but code snippet gives general idea just run etl and check output w/ Assert(s) On Jul 29, 2014, at 6:29 PM, soumick86 sdasgu...@dstsystems.com wrote: Is there any example out there for unit testing a Spark application in Java? Even a trivial application like word count will be very helpful. I am very new to this and I am struggling to understand how I can use JavaSpark Context for JUnit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Unit Testing (JUnit) with Spark
You can take a look at https://github.com/apache/spark/blob/master/core/src/test/java/org/apache/spark/JavaAPISuite.java and model your junits based on it. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Tue, Jul 29, 2014 at 10:10 PM, Kostiantyn Kudriavtsev kudryavtsev.konstan...@gmail.com wrote: Hi, try this one http://simpletoad.blogspot.com/2014/07/runing-spark-unit-test-on-windows-7.html it’s more about fixing windows-specific issue, but code snippet gives general idea just run etl and check output w/ Assert(s) On Jul 29, 2014, at 6:29 PM, soumick86 sdasgu...@dstsystems.com wrote: Is there any example out there for unit testing a Spark application in Java? Even a trivial application like word count will be very helpful. I am very new to this and I am struggling to understand how I can use JavaSpark Context for JUnit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
OK, I did figure this out. I was running the app (avocado) using spark-submit, when it was actually designed to take command line arguments to connect to a spark cluster. Since I didn't provide any such arguments, it started a nested local Spark cluster *inside* the YARN Spark executor and so of course everything ran on one node. If I spin up a Spark cluster manually and provide the spark master URI to avocado, it works fine. Now, I've tried running a reasonable-sized job through (400GB of data on 10 HDFS/Spark nodes), and the partitioning is strange. Eight nodes get almost nothing, and the other two nodes each get half the work. This happens whether I use coalesce with shuffle=true or false before the work stage. (Though if I use shuffle=true, it creates 3000 tasks to do the shuffle, and still ends up with this skewed distribution!) Any suggestions on how to figure out what's going on? Thanks, Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10868.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
GraphX Connected Components
Hey all, I’m currently trying to run connected components using GraphX on a large graph (~1.8b vertices and ~3b edges, most of them are self edges where the only edge that exists for vertex v is v-v) on emr using 50 m3.xlarge nodes. As the program runs I’m seeing each iteration take longer and longer to complete, this seems counter intuitive to me, especially since I am seeing the shuffle read/write amounts decrease with each iteration. I would think that as more and more vertices converged the iterations should take a shorter amount of time. I can run on up to 150 of the 500 part files (stored on s3) and it finishes in about 12 minutes, but with all the data I’ve let it run up to 4 hours and it still doesn’t complete. Does anyone have ideas for approaches to trouble shooting this, spark parameters that might need to be tuned, etc? Best Regards, Jeffrey Picard
Re: KMeans: expensiveness of large vectors
Development is really rapid here, that's a great thing. Out of curiosity, how did communication work before torrent? Did everything have to go back to the master / driver first? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10870.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: HiveContext is creating metastore warehouse locally instead of in hdfs
Thanks for the response... hive-site.xml is in the classpath so that doesn't seem to be the issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838p10871.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: iScala or Scala-notebook
IScala itself seems to be a bit dead unfortunately. I did come across this today: https://github.com/tribbloid/ISpark On Fri, Jul 18, 2014 at 4:59 AM, ericjohnston1989 ericjohnston1...@gmail.com wrote: Hey everyone, I know this was asked before but I'm wondering if there have since been any updates. Are there any plans to integrate iScala/Scala-notebook with spark in the near future? This seems like something a lot of people would find very useful, so I was just wondering if anyone has started working on it. Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iScala-or-Scala-notebook-tp10127.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
python project like spark-jobserver?
I'm looking for something like the ooyala spark-jobserver ( https://github.com/ooyala/spark-jobserver) that basically manages a SparkContext for use from a REST or web application environment, but for python jobs instead of scala. Has anyone written something like this? Looking for a project or pointers as to what would be the best way to do something like this with Python and Spark. -Chris
Re: HiveContext is creating metastore warehouse locally instead of in hdfs
The warehouse and the metastore directories are two different things. The metastore holds the schema information about the tables and will by default be a local directory. With javax.jdo.option.ConnectionURL you can configure it to be something like mysql. The warehouse directory is the default location where the actual contents of the tables is stored. What directory are seeing created locally? On Tue, Jul 29, 2014 at 10:49 AM, nikroy16 nikro...@gmail.com wrote: Thanks for the response... hive-site.xml is in the classpath so that doesn't seem to be the issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838p10871.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: iScala or Scala-notebook
Some people started some work on that topic using the notebook (the original or the n8han one, cannot remember)... Some issues have ben created already ^^ Le 29 juil. 2014 19:59, Nick Pentreath nick.pentre...@gmail.com a écrit : IScala itself seems to be a bit dead unfortunately. I did come across this today: https://github.com/tribbloid/ISpark On Fri, Jul 18, 2014 at 4:59 AM, ericjohnston1989 ericjohnston1...@gmail.com wrote: Hey everyone, I know this was asked before but I'm wondering if there have since been any updates. Are there any plans to integrate iScala/Scala-notebook with spark in the near future? This seems like something a lot of people would find very useful, so I was just wondering if anyone has started working on it. Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iScala-or-Scala-notebook-tp10127.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Using countApproxDistinct in pyspark
Heya, I would like to use countApproxDistinct in pyspark, I know that it's an experimental method and that it is not yet available in pyspark. I started with porting the countApproxDistinct unit-test to Python, see https://gist.github.com/drdee/d68eaf0208184d72cbff. Surprisingly, the results are way off. Using Scala, I get the following two counts (using https://github.com/apache/spark/blob/4c7243e109c713bdfb87891748800109ffbaae07/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L78-87): scala simpleRdd.countApproxDistinct(4, 0) res2: Long = 73 scala simpleRdd.countApproxDistinct(8, 0) res3: Long = 99 In Python, with the same RDD as you can see in the gist, I get the following results: In [7]: rdd._jrdd.rdd().countApproxDistinct(4, 0) Out[7]: 29L In [8]: rdd._jrdd.rdd().countApproxDistinct(8, 0) Out[8]: 26L Clearly, I am doing something wrong here :) What is also weird is that when I set p to 8, I should get a more accurate number, but it's actually smaller. Any tips or pointers are much appreciated! Best, Diederik -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-countApproxDistinct-in-pyspark-tp10878.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark and Flume integration - do I understand this correctly?
Hi, I am trying to integrate Spark onto a Flume log sink and avro source. The sink is on one machine (the application), and the source is on another. Log events are being sent from the application server to the avro source server (a log directory sink on the arvo source prints to verify) The aim is to get Spark to also receive the same events that the avro source is getting. The steps, I believe, are: 1. install/start Spark master (on avro source machine). 2. write spark application, deploy (on avro source machine). 3. add spark application as a worker to the master. 4. have spark application configured to same port as avro source Test setup is using 2 ubuntu VMs on a Windows host. Flume configuration: # application ## ## Tail application log file # /var/lib/apache-flume-1.5.0-bin/bin/flume-ng agent -n cps -c conf -f conf/flume-conf.properties # http://flume.apache.org/FlumeUserGuide.html#exec-source source_agent.sources = tomcat source_agent.sources.tomcat.type = exec source_agent.sources.tomcat.command = tail -F /var/lib/tomcat/logs/application.log source_agent.sources.tomcat.batchSize = 1 source_agent.sources.tomcat.channels = memoryChannel # http://flume.apache.org/FlumeUserGuide.html#memory-channel source_agent.channels = memoryChannel source_agent.channels.memoryChannel.type = memory source_agent.channels.memoryChannel.capacity = 100 ## Send to Flume Collector on Analytics Node # http://flume.apache.org/FlumeUserGuide.html#avro-sink source_agent.sinks = avro_sink source_agent.sinks.avro_sink.type = avro source_agent.sinks.avro_sink.channel = memoryChannel source_agent.sinks.avro_sink.hostname = 10.0.2.2 source_agent.sinks.avro_sink.port = 41414 avro source ## ## Receive Flume events for Spark streaming # http://flume.apache.org/FlumeUserGuide.html#memory-channel agent1.channels = memoryChannel agent1.channels.memoryChannel.type = memory agent1.channels.memoryChannel.capacity = 100 ## Flume Collector on Analytics Node # http://flume.apache.org/FlumeUserGuide.html#avro-source agent1.sources = avroSource agent1.sources.avroSource.type = avro agent1.sources.avroSource.channels = memoryChannel agent1.sources.avroSource.bind = 0.0.0.0 agent1.sources.avroSource.port = 41414 #Sinks agent1.sinks = localout #http://flume.apache.org/FlumeUserGuide.html#file-roll-sink agent1.sinks.localout.type = file_roll agent1.sinks.localout.sink.directory = /home/vagrant/flume/logs agent1.sinks.localout.sink.rollInterval = 0 agent1.sinks.localout.channel = memoryChannel thank you in advance for any assistance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [GraphX] How to access a vertex via vertexId?
Yifan LI iamyifa...@gmail.com writes: Maybe you could get the vertex, for instance, which id is 80, by using: graph.vertices.filter{case(id, _) = id==80}.collect but I am not sure this is the exactly efficient way.(it will scan the whole table? if it can not get benefit from index of VertexRDD table) Until IndexedRDD is merged, a scan and collect is the best officially supported way. PairRDDFunctions.lookup does this under the hood as well. However, it's possible to use the VertexRDD's hash index to do a much more efficient lookup. Note that these APIs may change, since VertexPartitionBase and its subclasses are private[graphx]. You can access the partitions of a VertexRDD using VertexRDD#partitionsRDD, and each partition has VertexPartitionBase#isDefined and VertexPartitionBase#apply. Putting it all together: val verts: VertexRDD[_] = ... val targetVid: VertexId = 80L val result = verts.partitionsRDD.flatMap { part = if (part.isDefined(targetVid)) Some(part(targetVid)) else None }.collect.head Once IndexedRDD [1] is merged, it will provide this functionality using verts.get(targetVid). Its implementation of get also uses the hash partitioner to run only one task [2]. Ankur [1] https://issues.apache.org/jira/browse/SPARK-2365 [2] https://github.com/ankurdave/spark/blob/IndexedRDD/core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala#L89
Re: how to publish spark inhouse?
i just looked at my dependencies in sbt, and when using cdh4.5.0 dependencies i see that hadoop clients pulls in jboss netty (via zookeeper) and asm 3.x (via jersey-server). so somehow these exclusion rules are not working anymore? i will look into sbt-pom-reader a bit to try to understand whats happening On Mon, Jul 28, 2014 at 8:45 PM, Patrick Wendell pwend...@gmail.com wrote: All of the scripts we use to publish Spark releases are in the Spark repo itself, so you could follow these as a guideline. The publishing process in Maven is similar to in SBT: https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L65 On Mon, Jul 28, 2014 at 12:39 PM, Koert Kuipers ko...@tresata.com wrote: ah ok thanks. guess i am gonna read up about maven-release-plugin then! On Mon, Jul 28, 2014 at 3:37 PM, Sean Owen so...@cloudera.com wrote: This is not something you edit yourself. The Maven release plugin manages setting all this. I think virtually everything you're worried about is done for you by this plugin. Maven requires artifacts to set a version and it can't inherit one. I feel like I understood the reason this is necessary at one point. On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers ko...@tresata.com wrote: and if i want to change the version, it seems i have to change it in all 23 pom files? mhhh. is it mandatory for these sub-project pom files to repeat that version info? useful? spark$ grep 1.1.0-SNAPSHOT * -r | wc -l 23 On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers ko...@tresata.com wrote: hey we used to publish spark inhouse by simply overriding the publishTo setting. but now that we are integrated in SBT with maven i cannot find it anymore. i tried looking into the pom file, but after reading 1144 lines of xml i 1) havent found anything that looks like publishing 2) i feel somewhat sick too 3) i am considering alternative careers to developing... where am i supposed to look? thanks for your help!
Re: the pregel operator of graphx throws NullPointerException
Denis RP qq378789...@gmail.com writes: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:4 failed 4 times, most recent failure: Exception failure in TID 598 on host worker6.local: java.lang.NullPointerException [error] scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [error] scala.collection.Iterator$class.foreach(Iterator.scala:727) [error] scala.collection.AbstractIterator.foreach(Iterator.scala:1157) It looks like Iterator.scala:328 [1] is Iterator#map, and it's likely failing because the map function is null. I haven't seen this before, but I wonder if SPARK-2292 [2] is related. The stack trace is different there, but the problem of a function being null is the same. Based on the JIRA comments, it might be a problem with your build and launch process. How are you deploying your application? Ankur [1] https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/Iterator.scala#L328 [2] https://issues.apache.org/jira/browse/SPARK-2292
Example standalone app error!
I am trying to run an example Spark standalone app with the following code import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkGensimLDA extends App{ val ssc=new StreamingContext(local,testApp,Seconds(5)) val lines=ssc.textFileStream(/.../spark_example/) val words=lines.flatMap(_.split( )) val wordCounts=words.map(x = (x,1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } However I am getting the following error 15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating local directories at root dirs '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/' 15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark] java.lang.AbstractMethodError: org.apache.spark.storage.BlockManagerMasterActor.aroundPostStop()V at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.terminate(ActorCell.scala:369) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.177 [spark-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] java.lang.AbstractMethodError:
Re: Job using Spark for Machine Learning
Hi Martin, Job ads are actually not allowed on the list, but thanks for asking. Just posting this for others' future reference. Matei On July 29, 2014 at 8:34:59 AM, Martin Goodson (mar...@skimlinks.com) wrote: I'm not sure if job adverts are allowed on here - please let me know if not. Otherwise, if you're interested in using Spark in an RD machine learning project then please get in touch. We are a startup based in London. Our data sets are on a massive scale- we collect data on over a billion users per month and are second only to Google in the contextual advertising space (ok - a distant second!). Details here: http://grnh.se/rl8f25 -- Martin Goodson | VP Data Science (0)20 3397 1240
Re: KMeans: expensiveness of large vectors
Before torrent, http is the default way for broadcasting. The driver holds the data and the executors request the data via http, making the driver the bottleneck if the data is large. -Xiangrui On Tue, Jul 29, 2014 at 10:32 AM, durin m...@simon-schaefer.net wrote: Development is really rapid here, that's a great thing. Out of curiosity, how did communication work before torrent? Did everything have to go back to the master / driver first? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10870.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.io.StreamCorruptedException: invalid type code: 00
Just realized that I was missing the JavaSparkContext in the import and after adding it, the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.lang.reflect.Method at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:771) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:714) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:718) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:717) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:717) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:718) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:717) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:717) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1198) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Alexis On Jul 29, 2014, at 2:53 PM, Alexis Roos alexis.r...@gmail.com wrote: Hello, I am porting a data process running in Spark from Scala to Java (8) using Lambdas to see how practical Java 8 is. The first few steps are working (parsing data, creating JavaRDDs) but then it fails while doing a cogroup between two JavaPairRDDString, String. I am getting a bunch of java.io.StreamCorruptedException: invalid type code: 00 and ultimately the stack trace below. It is running on Mac OS X local mode with Java SE 8. java version 1.8.0_11 Java(TM) SE Runtime Environment (build 1.8.0_11-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.11-b03, mixed mode) Any ideas on possible root cause ?? Thanks, Alexis — Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: java.io.StreamCorruptedException: invalid type code: 00 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379) java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
RE: Avro Schema + GenericRecord to HadoopRDD
Hi Benjamin, I think the best bet would be to use the Avro code generation stuff to generate a SpecificRecord for your schema and then change the reader to use your specific type rather than GenericRecord. Trying to read up the generic record and then do type inference and spit out a tuple is way more headache than it's worth if you already have the schema in hand (I've done it for Cascading/Scalding). - Chris From: Laird, Benjamin [benjamin.la...@capitalone.com] Sent: Tuesday, July 29, 2014 8:00 AM To: user@spark.apache.org; u...@spark.incubator.apache.org Subject: Avro Schema + GenericRecord to HadoopRDD Hi all, I can read in Avro files to Spark with HadoopRDD and submit the schema in the jobConf, but with the guidance I've seen so far, I'm left with a avro GenericRecord of Java objects without type. How do I actually use the schema to have the types inferred? Example: scala AvroJob.setInputSchema(jobConf,schema); scala val rdd = sc.hadoopRDD(jobConf,classOf[org.apache.avro.mapred.AvroInputFormat[Generic Record]],classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],classOf [org.apache.hadoop.io.NullWritable],10) 14/07/29 09:27:49 INFO storage.MemoryStore: ensureFreeSpace(134254) called with curMem=0, maxMem=308713881 14/07/29 09:27:49 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 131.1 KB, free 294.3 MB) rdd: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avr o.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = HadoopRDD[0] at hadoopRDD at console:50 scala rdd.first._1.datum.get(amt) 14/07/29 09:31:34 INFO spark.SparkContext: Starting job: first at console:53 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Got job 3 (first at console:53) with 1 output partitions (allowLocal=true) 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Final stage: Stage 3(first at console:53) 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Missing parents: List() 14/07/29 09:31:34 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/07/29 09:31:34 INFO rdd.HadoopRDD: Input split: hdfs://nameservice1:8020/user/nylab/prod/persistent_tables/creditsetl_ref_e txns/201201/part-0.avro:0+34279385 14/07/29 09:31:34 INFO spark.SparkContext: Job finished: first at console:53, took 0.061220615 s res11: Object = 24.0 Thanks! Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How true is this about spark streaming?
Hi, that quoted statement doesn't make too much sense for me, either. Maybe if you had a link for us that shows the context (Google doesn't reveal anything but this conversation), we could evaluate that statement better. Tobias On Tue, Jul 29, 2014 at 5:53 PM, Sean Owen so...@cloudera.com wrote: I'm not sure I understand this, maybe because the context is missing. An RDD is immutable, so there is no such thing as writing to an RDD. I'm not sure which aspect is being referred to as single-threaded. Is this the Spark Streaming driver? What is the difference between streaming into Spark and reading from the stream? Streaming data into Spark means Spark reads the stream. A mini batch of data is exposed as an RDD, but the stream processing continues while it is operated on. Saving the RDDs is one of the most basic operations exposed by streaming: http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations No, you do not stop the stream processing to persist it. In fact you couldn't. On that basis, no, this sounds fairly wrong. On Tue, Jul 29, 2014 at 1:37 AM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello folks: I came across a thread that said A Spark RDD read/write access is driven by a context object and is single threaded. You cannot stream into Spark and read from the stream at the same time. You have to stop the stream processing, snapshot the RDD and continue Can you please offer some insights? Thanks, Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
How to submit Pyspark job in mesos?
Dear all, I have spark1.0.0 and mesos0.18.1. After setting in mesos and spark and starting the mesos cluster, I try to run the pyspark job by the command below: spark-submit /path/to/my_pyspark_job.py --master mesos://192.168.0.21:5050 It occurs error below: 14/07/29 18:40:49 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/29 18:40:49 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4041 14/07/29 18:40:49 INFO ui.SparkUI: Started SparkUI at http://CentOS-19:4041 14/07/29 18:40:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/29 18:40:50 INFO scheduler.EventLoggingListener: Logging events to /tmp/spark-events/my_test.py-1406630449771 14/07/29 18:40:50 INFO util.Utils: Copying /home/daijia/deal_three_word/my_test.py to /tmp/spark-4365b01d-b57a-4abb-b39c-cb57b83a28ce/my_test.py 14/07/29 18:40:50 INFO spark.SparkContext: Added file file:/home/daijia/deal_three_word/my_test.py at http://192.168.3.91:51188/files/my_test.py with timestamp 1406630450333 I0729 18:40:50.440551 15033 sched.cpp:121] Version: 0.18.1 I0729 18:40:50.442450 15035 sched.cpp:217] New master detected at master@192.168.3.91:5050 I0729 18:40:50.442570 15035 sched.cpp:225] No credentials provided. Attempting to register without authentication I0729 18:40:50.443234 15036 sched.cpp:391] Framework registered with 20140729-174911-1526966464-5050-13758-0006 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Registered as framework ID 20140729-174911-1526966464-5050-13758-0006 14/07/29 18:40:50 INFO spark.SparkContext: Starting job: count at /home/daijia/deal_three_word/my_test.py:27 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now TASK_LOST 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 1 is now TASK_LOST 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 3 is now TASK_LOST 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Blacklisting Mesos slave value: 20140729-163345-1526966464-5050-10913-0 due to too many failures; is Spark installed on it? 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 2 is now TASK_LOST 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Blacklisting Mesos slave value: 20140729-163345-1526966464-5050-10913-2 due to too many failures; is Spark installed on it? 14/07/29 18:40:50 INFO scheduler.DAGScheduler: Got job 0 (count at /home/daijia/deal_three_word/my_test.py:27) with 2 output partitions (allowLocal=false) 14/07/29 18:40:50 INFO scheduler.DAGScheduler: Final stage: Stage 0(count at /home/daijia/deal_three_word/my_test.py:27) 14/07/29 18:40:50 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/07/29 18:40:50 INFO scheduler.DAGScheduler: Missing parents: List() 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 4 is now TASK_LOST 14/07/29 18:40:50 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37), which has no missing parents 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 5 is now TASK_LOST 14/07/29 18:40:50 INFO mesos.CoarseMesosSchedulerBackend: Blacklisting Mesos slave value: 20140729-163345-1526966464-5050-10913-1 due to too many failures; is Spark installed on it? 14/07/29 18:40:50 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37) 14/07/29 18:40:50 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/07/29 18:41:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/29 18:41:20 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/29 18:41:20 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory It just repeats the last message. Here is my python scirpt: #!/usr/bin/env python #coding=utf-8 from pyspark import SparkContext sc = SparkContext() temp = [] for index in range(1000): temp.append(index) sc.parallelize(temp).count() So, the running command is right? Or some other reasons lead to the problem. Thanks in advance, Daijia -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-submit-Pyspark-job-in-mesos-tp10905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How do you debug a PythonException?
I’m in the PySpark shell and I’m trying to do this: a = sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json', minPartitions=sc.defaultParallelism * 3).cache() a.map(lambda x: len(x)).max() My job dies with the following: 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 73, in main command = pickleSer._read_with_length(infile) File /root/spark/python/pyspark/serializers.py, line 142, in _read_with_length length = read_int(stream) File /root/spark/python/pyspark/serializers.py, line 337, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19 on ip-10-190-171-217.ec2.internal: remote Akka client disassociated How do I debug this? I’m using 1.0.2-rc1 deployed to EC2. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-debug-a-PythonException-tp10906.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to specify the job to run on the specific nodes(machines) in the hadoop yarn cluster?
Hi all, RT. I want to run a job on specific two nodes in the cluster? How to configure the yarn? Dose yarn queue help? Thanks
Is it possible to read file head in each partition?
Hi, all We are migrating from mapreduce to spark, and encountered a problem. Our input files are IIS logs with file head. It's easy to get the file head if we process only one file, e.g. val lines = sc.textFile('hdfs://*/u_ex14073011.log') val head = lines.take(4) Then we can write our map method using this head. However, if we input multiple files, each of which may have a different file head, how can we get file head for each partition? It seems we have two options: 1. still use textFile() to get lines. Since each partition may have a different head, we have to write mapPartitionsWithContext method. However we can't find a way to get the head for each partition. In our former mapreduce program, we could simply use Path path = ((FileSplit) context.getInputSplit()).getPath() but there seems no way in spark, since HadoopPartition which wraps InputSplit inside HadoopRDD is a private class. 2. use wholeTextFile() to get whole contents. It's easy to get file head for each file, but according to the document, this API is better for small files. *Any suggestions on how to process these files with heads?*
Re: Is it possible to read file head in each partition?
This is an interesting question. I’m curious to know as well how this problem can be approached. Is there a way, perhaps, to ensure that each input file matching the glob expression gets mapped to exactly one partition? Then you could probably get what you want using RDD.mapPartitions(). Nick On Wed, Jul 30, 2014 at 12:02 AM, Fengyun RAO raofeng...@gmail.com wrote: Hi, all We are migrating from mapreduce to spark, and encountered a problem. Our input files are IIS logs with file head. It's easy to get the file head if we process only one file, e.g. val lines = sc.textFile('hdfs://*/u_ex14073011.log') val head = lines.take(4) Then we can write our map method using this head. However, if we input multiple files, each of which may have a different file head, how can we get file head for each partition? It seems we have two options: 1. still use textFile() to get lines. Since each partition may have a different head, we have to write mapPartitionsWithContext method. However we can't find a way to get the head for each partition. In our former mapreduce program, we could simply use Path path = ((FileSplit) context.getInputSplit()).getPath() but there seems no way in spark, since HadoopPartition which wraps InputSplit inside HadoopRDD is a private class. 2. use wholeTextFile() to get whole contents. It's easy to get file head for each file, but according to the document, this API is better for small files. *Any suggestions on how to process these files with heads?*
Re: Using Spark Streaming with Kafka 0.7.2
Hi, For testing you could also just use the Kafka 0.7.2 console consumer and pipe it's output to netcat (nc) and process that as in the example https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala That worked for me. Backporting to the older Kafka version seems tricky due to all the protocol changes. Andre On 07/26/2014 12:56 AM, Tathagata Das wrote: Spark Streaming is built as part of the whole Spark repository. Hence follow Spark's building instructions http://spark.apache.org/docs/latest/building-with-maven.html to build Spark Streaming along with Spark. Spark Streaming 0.8.1 was built with kafka 0.7.2. You can take a look. If necessary, I recommend modifying the current Kafka Receiver based on the 0.8.1 Kafka Receiver https://github.com/apache/spark/blob/v0.8.1-incubating/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala TD On Fri, Jul 25, 2014 at 10:16 AM, maddenpj madde...@gmail.com wrote: Hi all, Currently we have Kafka 0.7.2 running in production and can't upgrade for external reasons however spark streaming (1.0.1) was built with Kafka 0.8.0. What is the best way to use spark streaming with older versions of Kafka. Currently I'm investigating trying to build spark streaming myself but I can't find any documentation specifically for building spark streaming. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Streaming-with-Kafka-0-7-2-tp10674.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: why a machine learning application run slowly on the spark cluster
Could you share more details about the dataset and the algorithm? For example, if the dataset has 10M+ features, it may be slow for the driver to collect the weights from executors (just a blind guess). -Xiangrui On Tue, Jul 29, 2014 at 9:15 PM, Tan Tim unname...@gmail.com wrote: Hi, all [Setting] Input data: the data on the hdfs, 10 part (text file), the size of every part is about 2.3G Spark Clusters Run on CentOS, 8 machines, 8 cores and 128G Memory per machine. The setting for Spark Context: val conf = new SparkConf().setMaster(spark://xxx-xxx-xx001:12036). setAppName(OWLQN).setSparkHome(/var/bh/lib/spark-0.9.1-bin-hadoop1). setJars(List(jarFile)) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, LRRegistrator) conf.set(spark.executor.memory, 64g) conf.set(spark.default.parallelism, 128) conf.set(spark.akka.timeout, 60) conf.set(spark.storage.memoryFraction, 0.7) conf.set(spark.kryoserializer.buffer.mb, 1024) conf.set(spark.cores.max, 64) conf.set(spark.speculation, true) conf.set(spark.storage.blockManagerTimeoutIntervalMs, 6) val sc = new SparkContext(conf) [Trouble] Executor not start up concurency For every stage, the executor not start up concurrency, some executor finished all the tasks, other excutor still not begin the task, as the webUI shows (some executors finished 10 tasks, and the other two is still not shown on the webUI): as Andrew Xia suggestion, I add sleep after new spark context, but some stage also has this problem. IO/CPU alwsy not fully used when taskes start up, all the cpu is not fully used, the usage of cpu more than 100% for less than 2 seconds, and then drop to 1%, but all the task not finished. The same thing happens to I/O The attach file is the log for some stages, every stage average 3.5 minutes, too slowly compares to other experiment(run the same task on the clusters of ubuntu not centos)
Re: Is it possible to read file head in each partition?
It will certainly cause bad performance, since it reads the whole content of a large file into one value, instead of splitting it into partitions. Typically one file is 1 GB. Suppose we have 3 large files, in this way, there would only be 3 key-value pairs, and thus 3 tasks at most. 2014-07-30 12:49 GMT+08:00 Hossein fal...@gmail.com: You can use SparkContext.wholeTextFile(). Please note that the documentation suggests: Small files are preferred, large file is also allowable, but may cause bad performance. --Hossein On Tue, Jul 29, 2014 at 9:21 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: This is an interesting question. I’m curious to know as well how this problem can be approached. Is there a way, perhaps, to ensure that each input file matching the glob expression gets mapped to exactly one partition? Then you could probably get what you want using RDD.mapPartitions(). Nick On Wed, Jul 30, 2014 at 12:02 AM, Fengyun RAO raofeng...@gmail.com wrote: Hi, all We are migrating from mapreduce to spark, and encountered a problem. Our input files are IIS logs with file head. It's easy to get the file head if we process only one file, e.g. val lines = sc.textFile('hdfs://*/u_ex14073011.log') val head = lines.take(4) Then we can write our map method using this head. However, if we input multiple files, each of which may have a different file head, how can we get file head for each partition? It seems we have two options: 1. still use textFile() to get lines. Since each partition may have a different head, we have to write mapPartitionsWithContext method. However we can't find a way to get the head for each partition. In our former mapreduce program, we could simply use Path path = ((FileSplit) context.getInputSplit()).getPath() but there seems no way in spark, since HadoopPartition which wraps InputSplit inside HadoopRDD is a private class. 2. use wholeTextFile() to get whole contents. It's easy to get file head for each file, but according to the document, this API is better for small files. *Any suggestions on how to process these files with heads?*
Re: How to submit Pyspark job in mesos?
Actually, it runs okay in my slaves deployed by standalone mode. When I switch to mesos, the error just occurs. Anyway, thanks for your reply and any ideas will help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-submit-Pyspark-job-in-mesos-tp10905p10918.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: the pregel operator of graphx throws NullPointerException
I build it with sbt package, I run it with sbt run, and I do use SparkConf.set for deployment options and external jars. It seems that spark-submit can't load extra jars and will lead to noclassdeffounderror, should I pack all the jars to a giant one and give it a try? I run it on a cluster of 8 machines, the test data consists of 1,000,000 vertices and edges are sparse. I use Graph.apply to build the graph, before the build, I tested the vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED] with count and first, the output looks fine. I'm using ubuntu 12.04 and spark 1.0.1 with the serializable bug fixed, java was installed with openjdk-7-jdk. BTW, is there a chance that bagel can work fine? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/the-pregel-operator-of-graphx-throws-NullPointerException-tp10865p10920.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: why a machine learning application run slowly on the spark cluster
The weight vector is usually dense and if you have many partitions, the driver may slow down. You can also take a look at the driver memory inside the Executor tab in WebUI. Another setting to check is the HDFS block size and whether the input data is evenly distributed to the executors. Are the hardware specs the same for the two clusters? -Xiangrui On Tue, Jul 29, 2014 at 10:46 PM, Tan Tim unname...@gmail.com wrote: The application is Logistic Regression (OWLQN), we develop a sparse vector version. The feature dimesions is 1M+, but its very sparse. This appliction can run on another spark cluster, and every stage is about 50 seconds, and every executors have highly cpu usage. the only difference is OS(the faster one is ubuntu, and the slower on is centos).
Re: zip two RDD in pyspark
On Mon, Jul 28, 2014 at 12:58 PM, l lishu...@gmail.com wrote: I have a file in s3 that I want to map each line with an index. Here is my code: input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() N input_data.count() index = sc.parallelize(range(N), 6) index.zip(input_data).collect() I think you can not do zipWithIndex() in this way, because the number of lines in each partition of input_data will be different than index. You need get the exact number of lines for each partitions first, then generate correct index. It will be easy to do with mapPartitions() nums = input_data.mapPartitions(lambda it: [sum(1 for i in it)]).collect() starts = [sum(nums[:i]) for i in range(len(nums))] zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, x) for j,x in enumerate(it))) ... 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1) finished in 0.031 s 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1, took 0.03707 s Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/rdd.py, line 584, in collect return list(self._collect_iterator_through_file(bytesInJava)) File /root/spark/python/pyspark/rdd.py, line 592, in _collect_iterator_through_file self.ctx._writeToFile(iterator, tempFile.name) File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.writeToFile. : java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) As I see it, the job is completed, but I don't understand what's happening to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD and it works fine. But here I have a MappedRDD at textFile. Not sure what's going on here. Could you provide an script and dataset to reproduce this error? Maybe there are some corner cases during serialization. Also, why Python does not have ZipWithIndex()? The features in PySpark are much less than Spark, hopefully it will catch up in next two releases. Thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to submit Pyspark job in mesos?
Maybe mesos or spark was not configured correctly, could you check the log files in mesos slaves? It should log the reason when mesos can not lunch the executor. On Tue, Jul 29, 2014 at 10:39 PM, daijia jia_...@intsig.com wrote: Actually, it runs okay in my slaves deployed by standalone mode. When I switch to mesos, the error just occurs. Anyway, thanks for your reply and any ideas will help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-submit-Pyspark-job-in-mesos-tp10905p10918.html Sent from the Apache Spark User List mailing list archive at Nabble.com.