Eclipse Spark plugin and sample Scala projects
Hi -- I tried searching for eclipse spark plugin setup for developing with Spark and there seems to be some information I can go with but I have not seen a starter app or project to import into Eclipse and try it out. Can anyone please point me to any Scala projects to import into Scala Eclipse IDE? Thanks for all the help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Eclipse-Spark-plugin-and-sample-Scala-projects-tp9719.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark SQL throws ClassCastException on first try; works on second
I’m running this query against RDD[Tweet], where Tweet is a simple case class with 4 fields. sqlContext.sql( SELECT user, COUNT(*) as num_tweets FROM tweets GROUP BY user ORDER BY num_tweets DESC, user ASC ; ).take(5) The first time I run this, it throws the following: 14/07/15 06:11:51 ERROR TaskSetManager: Task 12.0:0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 12.0:0 failed 4 times, most recent failure: Exception failure in TID 978 on host ip-10-144-204-254.ec2.internal: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String scala.math.Ordering$String$.compare(Ordering.scala:329) org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:227) org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:210) java.util.TimSort.mergeLo(TimSort.java:687) java.util.TimSort.mergeAt(TimSort.java:483) java.util.TimSort.mergeCollapse(TimSort.java:410) java.util.TimSort.sort(TimSort.java:214) java.util.TimSort.sort(TimSort.java:173) java.util.Arrays.sort(Arrays.java:659) scala.collection.SeqLike$class.sorted(SeqLike.scala:615) scala.collection.mutable.ArrayOps$ofRef.sorted(ArrayOps.scala:108) org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:154) org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:154) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) If I immediately re-run the query, it works fine. I’ve been able to reproduce this a few times. If I run other, simpler SELECT queries first and then this one, it also gets around the problem. Strange… I’m on 1.0.0 on EC2. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-throws-ClassCastException-on-first-try-works-on-second-tp9720.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SparkR failed to connect to the master
You'll need to build SparkR to match the Spark version deployed on the cluster. You can do that by changing the Spark version in SparkR's build.sbt [1]. If you are using the Maven build you'll need to edit pom.xml Thanks Shivaram [1] https://github.com/amplab-extras/SparkR-pkg/blob/master/pkg/src/build.sbt#L20 On Mon, Jul 14, 2014 at 6:19 PM, cjwang c...@cjwang.us wrote: I tried installing the latest Spark 1.0.1 and SparkR couldn't find the master either. I restarted with Spark 0.9.1 and SparkR was able to find the master. So, there seemed to be something that changed after Spark 1.0.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-failed-to-connect-to-the-master-tp9359p9680.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
答复:RACK_LOCAL Tasks Failed to finish
I just running PageRank(included in GraphX) on a dataset which has 55876487 edges. I submit the application to YARN with options`--num-executors 30 --executor-memory 30g --driver-memory 10g --executor-cores 8`. Thanks--发件人:Ankur Dave ankurd...@gmail.com发送时间:2014年7月15日(星期二) 11:52收件人:user user@spark.apache.org,李奇平(洪奇) qiping@alibaba-inc.com主 题:Re: RACK_LOCAL Tasks Failed to finish What GraphX application are you running? If it's a custom application that calls RDD.unpersist, that might cause RDDs to be recomputed. It's tricky to do unpersisting correctly, so you might try not unpersisting and see if that helps. Ankur
Re: Spark SQL throws ClassCastException on first try; works on second
You might be hitting SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994, which is fixed in 1.0.1. On Mon, Jul 14, 2014 at 11:16 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I’m running this query against RDD[Tweet], where Tweet is a simple case class with 4 fields. sqlContext.sql( SELECT user, COUNT(*) as num_tweets FROM tweets GROUP BY user ORDER BY num_tweets DESC, user ASC ; ).take(5) The first time I run this, it throws the following: 14/07/15 06:11:51 ERROR TaskSetManager: Task 12.0:0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 12.0:0 failed 4 times, most recent failure: Exception failure in TID 978 on host ip-10-144-204-254.ec2.internal: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String scala.math.Ordering$String$.compare(Ordering.scala:329) org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:227) org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:210) java.util.TimSort.mergeLo(TimSort.java:687) java.util.TimSort.mergeAt(TimSort.java:483) java.util.TimSort.mergeCollapse(TimSort.java:410) java.util.TimSort.sort(TimSort.java:214) java.util.TimSort.sort(TimSort.java:173) java.util.Arrays.sort(Arrays.java:659) scala.collection.SeqLike$class.sorted(SeqLike.scala:615) scala.collection.mutable.ArrayOps$ofRef.sorted(ArrayOps.scala:108) org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:154) org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:154) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) If I immediately re-run the query, it works fine. I’ve been able to reproduce this a few times. If I run other, simpler SELECT queries first and then this one, it also gets around the problem. Strange… I’m on 1.0.0 on EC2. Nick -- View this message in context: Spark SQL throws ClassCastException on first try; works on second http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-throws-ClassCastException-on-first-try-works-on-second-tp9720.html Sent from the Apache Spark User List mailing list archive
Re: hdfs replication on saving RDD
In general it would be nice to be able to configure replication on a per-job basis. Is there a way to do that without changing the config values in the Hadoop conf/ directory between jobs? Maybe by modifying OutputFormats or the JobConf ? On Mon, Jul 14, 2014 at 11:12 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can change this setting through SparkContext.hadoopConfiguration, or put the conf/ directory of your Hadoop installation on the CLASSPATH when you launch your app so that it reads the config values from there. Matei On Jul 14, 2014, at 8:06 PM, valgrind_girl 124411...@qq.com wrote: eager to know this issue too,does any one knows how? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hdfs-replication-on-saving-RDD-tp289p9700.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Nested Query With Spark SQL(1.0.1)
I mean the query on the nested data such as JSON, not the nested query, sorry for the misunderstanding. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9726.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark SQL throws ClassCastException on first try; works on second
Ah, good catch, that seems to be it. I'd use 1.0.1, except I've been hitting up against SPARK-2471 https://issues.apache.org/jira/browse/SPARK-2471 with that version, which doesn't let me access my data in S3. :( OK, at least I know this has probably already been fixed. Nick On Tue, Jul 15, 2014 at 2:20 AM, Michael Armbrust mich...@databricks.com wrote: You might be hitting SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994, which is fixed in 1.0.1. On Mon, Jul 14, 2014 at 11:16 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I’m running this query against RDD[Tweet], where Tweet is a simple case class with 4 fields. sqlContext.sql( SELECT user, COUNT(*) as num_tweets FROM tweets GROUP BY user ORDER BY num_tweets DESC, user ASC ; ).take(5) The first time I run this, it throws the following: 14/07/15 06:11:51 ERROR TaskSetManager: Task 12.0:0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 12.0:0 failed 4 times, most recent failure: Exception failure in TID 978 on host ip-10-144-204-254.ec2.internal: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String scala.math.Ordering$String$.compare(Ordering.scala:329) org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:227) org.apache.spark.sql.catalyst.expressions.RowOrdering.compare(Row.scala:210) java.util.TimSort.mergeLo(TimSort.java:687) java.util.TimSort.mergeAt(TimSort.java:483) java.util.TimSort.mergeCollapse(TimSort.java:410) java.util.TimSort.sort(TimSort.java:214) java.util.TimSort.sort(TimSort.java:173) java.util.Arrays.sort(Arrays.java:659) scala.collection.SeqLike$class.sorted(SeqLike.scala:615) scala.collection.mutable.ArrayOps$ofRef.sorted(ArrayOps.scala:108) org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:154) org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:154) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) If I immediately re-run the query, it works fine. I’ve been able to reproduce this a few times. If I run other, simpler SELECT queries first and then this one, it also gets around the problem.
Re: import org.apache.spark.streaming.twitter._ in Shell
Hmm, I'd like to clarify something from your comments, Tathagata. Going forward, is Twitter Streaming functionality not supported from the shell? What should users do if they'd like to process live Tweets from the shell? Nick On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? Yep. If yes, then what change in Spark caused it to not work any more? It still works for me. I was just commenting on your remark that it doesn't work through the shell, which I now understand to apply to versions of Spark before 1.0.0. Nick
Re: Nested Query With Spark SQL(1.0.1)
In general this should be supported using [] to access array data and . to access nested fields. Is there something you are trying that isn't working? On Mon, Jul 14, 2014 at 11:25 PM, anyweil wei...@gmail.com wrote: I mean the query on the nested data such as JSON, not the nested query, sorry for the misunderstanding. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9726.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ALS on EC2
Could you share the code of RecommendationALS and the complete spark-submit command line options? Thanks! -Xiangrui On Mon, Jul 14, 2014 at 11:23 PM, Srikrishna S srikrishna...@gmail.com wrote: Using properties file: null Main class: RecommendationALS Arguments: _train.csv _validation.csv _test.csv System properties: SPARK_SUBMIT - true spark.app.name - RecommendationALS spark.jars - file:/root/projects/spark-recommendation-benchmark/benchmark_mf/target/scala-2.10/recommendation-benchmark_2.10-1.0.jar spark.master - local[8] Classpath elements: file:/root/projects/spark-recommendation-benchmark/benchmark_mf/target/scala-2.10/recommendation-benchmark_2.10-1.0.jar 14/07/15 05:57:41 INFO Slf4jLogger: Slf4jLogger started 14/07/15 05:57:41 INFO Remoting: Starting remoting 14/07/15 05:57:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@ip-172-31-19-62.us-west-2.compute.internal:57349] 14/07/15 05:57:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@ip-172-31-19-62.us-west-2.compute.internal:57349] 14/07/15 05:57:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/07/15 05:57:43 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/15 05:57:44 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/07/15 05:57:45 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14 and it continues trying and sleeping. Any help?
Re: Repeated data item search with Spark SQL(1.0.1)
Thank you so much for the reply, here is my code. 1. val conf = new SparkConf().setAppName(Simple Application) 2. conf.setMaster(local) 3. val sc = new SparkContext(conf) 4. val sqlContext = new org.apache.spark.sql.SQLContext(sc) 5. import sqlContext.createSchemaRDD 6. val path1 = ./data/people.json 7. val people = sqlContext.jsonFile(path1) 8. people.registerAsTable(people) 9. var sql=SELECT name FROM people WHERE schools.time2 10. val result = sqlContext.sql(sql) 11. result.collect().foreach(println) the content of people.json is: {name:Michael, schools:[{name:ABC,time:1994},{name:EFG,time:2000}]} {name:Andy, age:30,scores:{eng:98,phy:89}} {name:Justin, age:19} What I have tried is: *1. use HiveSQL:* I have tried to replace: line 4 with val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) line 10 with val result = sqlContext.hql(sql) (i have recomplie the spark jar with hive support), but seems got the same error. *2. use []. for the access:* I have tried to replace: line 9 with: var sql=SELECT name FROM people WHERE schools[0].time2, but got the error: 14/07/15 14:37:49 INFO SparkContext: Job finished: reduce at JsonRDD.scala:40, took 0.98412 s Exception in thread main java.lang.RuntimeException: [1.41] failure: ``UNION'' expected but identifier .time found SELECT name FROM people WHERE schools[0].time2 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:69) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:185) at SimpleApp$.main(SimpleApp.scala:32) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) seems not supported. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9731.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KMeansModel Construtor error
I don't think MLlib supports model serialization/deserialization. You got the error because the constructor is private. I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-2488 and we try to make sure it is implemented in v1.1. For now, you can modify the KMeansModel and remove private[mllib] from the constructor. Sorry for the inconvenience! -Xiangrui On Mon, Jul 14, 2014 at 10:41 PM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello Folks: I have written a simple program to read the already saved model from HDFS and score it. But when I'm trying to read the saved model, I get the following error. Any clues what might be going wrong here .. val x = sc.objectFile[Vector](/data/model).collect() val y = new KMeansModel(x); constructor KMeansModel in class KMeansModel cannot be accessed in object KMeansScore val y = new KMeansModel(x); ^ Thanks, Rohit CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Nested Query With Spark SQL(1.0.1)
Yes, just as my last post, using [] to access array data and . to access nested fields seems not work. BTW, i have deeped into the code of the current master branch. spark / sql / catalyst / src / main / scala / org / apache / spark / sql / catalyst / plans / logical / LogicalPlan.scala from line 72 (in the function def resolve(name: String): Option[NamedExpression]) ... options.distinct match { case (a, Nil) :: Nil = Some(a) // One match, no nested fields, use it. // One match, but we also need to extract the requested nested field. case (a, nestedFields) :: Nil = a.dataType match { case StructType(fields) = Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)()) case _ = None // Don't know how to resolve these field references } case Nil = None // No matches. case ambiguousReferences = throw new TreeNodeException( this, sAmbiguous references to $name: ${ambiguousReferences.mkString(,)}) } ... seems only StructType is handled and the ArrayType is not. So it seems to be bug or some feature not finished yet? thx! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9733.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Repeated data item search with Spark SQL(1.0.1)
Sorry for the trouble. There are two issues here: - Parsing of repeated nested (i.e. something[0].field) is not supported in the plain SQL parser. SPARK-2096 https://issues.apache.org/jira/browse/SPARK-2096 - Resolution is broken in the HiveQL parser. SPARK-2483 https://issues.apache.org/jira/browse/SPARK-2483 The latter issue is fixed now: #1411 https://github.com/apache/spark/pull/1411 Michael On Mon, Jul 14, 2014 at 11:38 PM, anyweil wei...@gmail.com wrote: Thank you so much for the reply, here is my code. 1. val conf = new SparkConf().setAppName(Simple Application) 2. conf.setMaster(local) 3. val sc = new SparkContext(conf) 4. val sqlContext = new org.apache.spark.sql.SQLContext(sc) 5. import sqlContext.createSchemaRDD 6. val path1 = ./data/people.json 7. val people = sqlContext.jsonFile(path1) 8. people.registerAsTable(people) 9. var sql=SELECT name FROM people WHERE schools.time2 10. val result = sqlContext.sql(sql) 11. result.collect().foreach(println) the content of people.json is: {name:Michael, schools:[{name:ABC,time:1994},{name:EFG,time:2000}]} {name:Andy, age:30,scores:{eng:98,phy:89}} {name:Justin, age:19} What I have tried is: *1. use HiveSQL:* I have tried to replace: line 4 with val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) line 10 with val result = sqlContext.hql(sql) (i have recomplie the spark jar with hive support), but seems got the same error. *2. use []. for the access:* I have tried to replace: line 9 with: var sql=SELECT name FROM people WHERE schools[0].time2, but got the error: 14/07/15 14:37:49 INFO SparkContext: Job finished: reduce at JsonRDD.scala:40, took 0.98412 s Exception in thread main java.lang.RuntimeException: [1.41] failure: ``UNION'' expected but identifier .time found SELECT name FROM people WHERE schools[0].time2 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:69) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:185) at SimpleApp$.main(SimpleApp.scala:32) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) seems not supported. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9731.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonRDD: NoSuchMethodError
I am running this in standalone mode on a single machine. I built the spark jar from scratch (sbt assembly) and then included that in my application (the same process I have done for earlier versions). thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9735.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: truly bizarre behavior with local[n] on Spark 1.0.1
This is (obviously) spark streaming, by the way. On Mon, Jul 14, 2014 at 8:27 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
Re: import org.apache.spark.streaming.twitter._ in Shell
If you want to make Twitter* classes available in your shell, I believe you could do the following 1. Change the parent pom module ordering - Move external/twitter before assembly 2. In assembly/pom.xm, add external/twitter dependency - this will package twitter* into the assembly jar Now when spark-shell is launched, assembly jar is in classpath - hence twitter* too. I think this will work (remember trying this sometime back) On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I'd like to clarify something from your comments, Tathagata. Going forward, is Twitter Streaming functionality not supported from the shell? What should users do if they'd like to process live Tweets from the shell? Nick On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? Yep. If yes, then what change in Spark caused it to not work any more? It still works for me. I was just commenting on your remark that it doesn't work through the shell, which I now understand to apply to versions of Spark before 1.0.0. Nick
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array supposed to work in this version? I noticed that other array types like int or string already work. Thanks, -- Pei-Lun
Re: Repeated data item search with Spark SQL(1.0.1)
Thank you so much for the information, now i have merge the fix of #1411 and seems the HiveSQL works with: SELECT name FROM people WHERE schools[0].time2. But one more question is: Is it possible or planed to support the schools.time format to filter the record that there is an element inside array of schools satisfy time 2? The above requirement should be more general than the schools[0].time2, as we sometime don't know which element in the array should satisfy the condition (we do not know if we should use 0 or 1 or X in the schools[X].time), we only care if there is one satisfy the condition, thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9741.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonRDD: NoSuchMethodError
The problem is resolved. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9742.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: import org.apache.spark.streaming.twitter._ in Shell
You could try the following: create a minimal project using sbt or Maven, add spark-streaming-twitter as a dependency, run sbt assembly (or mvn package) on that to create a fat jar (with Spark as provided dependency), and add that to the shell classpath when starting up. On Tue, Jul 15, 2014 at 9:06 AM, Praveen Seluka psel...@qubole.com wrote: If you want to make Twitter* classes available in your shell, I believe you could do the following 1. Change the parent pom module ordering - Move external/twitter before assembly 2. In assembly/pom.xm, add external/twitter dependency - this will package twitter* into the assembly jar Now when spark-shell is launched, assembly jar is in classpath - hence twitter* too. I think this will work (remember trying this sometime back) On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I'd like to clarify something from your comments, Tathagata. Going forward, is Twitter Streaming functionality not supported from the shell? What should users do if they'd like to process live Tweets from the shell? Nick On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? Yep. If yes, then what change in Spark caused it to not work any more? It still works for me. I was just commenting on your remark that it doesn't work through the shell, which I now understand to apply to versions of Spark before 1.0.0. Nick
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Oh, maybe not. Please file another JIRA. On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote: Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array supposed to work in this version? I noticed that other array types like int or string already
Spark Streaming w/ tshark exception problem on EC2
Hi, I’ve got a problem with Spark Streaming and tshark. While I’m running locally I have no problems with this code, but when I run it on a EC2 cluster I get the exception shown just under the code. def dissection(s: String): Seq[String] = { try { Process(hadoop command to create ./localcopy.tmp).! // calls hadoop to copy a file from s3 locally val pb = Process(“tshark … localcopy.tmp”) // calls tshark to transform the s3 file into sequence of strings var returnValue = pb.lines_!.toSeq return returnValue } catch { case e: Exception = System.err.println(“ERROR) return new MutableList[String]() } } (line 2051 points to the function “dissection”) WARN scheduler.TaskSetManager: Loss was due to java.lang.ExceptionInInitializerError java.lang.ExceptionInInitializerError at Main$$anonfun$11.apply(Main.scala:2051) at Main$$anonfun$11.apply(Main.scala:2051) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Has anyone got an idea why that may happen? I’m pretty sure that the hadoop call works perfectly. Thanks Gianluca
Kryo NoSuchMethodError on Spark 1.0.0 standalone
Hi there, I've been sucessfully using the precompiled Spark 1.0.0 Java api on a small cluster in standalone mode. However, when I try to use Kryo serializer by adding conf.set(spark.serializer,org.apache.spark.serializer.KryoSerializer); as suggested, Spark crashes out with the following error: Exception in thread main java.lang.NoSuchMethodError: com.esotericsoftware.kryo.Kryo.setInstantiatorStrategy(Lorg/objenesis/strategy/InstantiatorStrategy;)V at com.twitter.chill.KryoBase.setInstantiatorStrategy(KryoBase.scala:85) at com.twitter.chill.EmptyScalaKryoInstantiator.newKryo(ScalaKryoInstantiator.scala:57) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:56) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:130) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:92) at org.apache.spark.broadcast.HttpBroadcast$.write(HttpBroadcast.scala:172) at org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:57) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:776) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:545) at org.apache.spark.SparkContext.textFile(SparkContext.scala:457) at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:171) Yet Kryo is very much present in the spark-assembly jar. I'm very confused by this... Regards, Jari -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NoSuchMethodError-on-Spark-1-0-0-standalone-tp9746.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Catalyst dependency on Spark Core
Agree. You end up with a core and a corer core to distinguish between and it ends up just being more complicated. This sounds like something that doesn't need a module. On Tue, Jul 15, 2014 at 5:59 AM, Patrick Wendell pwend...@gmail.com wrote: Adding new build modules is pretty high overhead, so if this is a case where a small amount of duplicated code could get rid of the dependency, that could also be a good short-term option. - Patrick On Mon, Jul 14, 2014 at 2:15 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, I'd just add a spark-util that has these things. Matei On Jul 14, 2014, at 1:04 PM, Michael Armbrust mich...@databricks.com wrote: Yeah, sadly this dependency was introduced when someone consolidated the logging infrastructure. However, the dependency should be very small and thus easy to remove, and I would like catalyst to be usable outside of Spark. A pull request to make this possible would be welcome. Ideally, we'd create some sort of spark common package that has things like logging. That way catalyst could depend on that, without pulling in all of Hadoop, etc. Maybe others have opinions though, so I'm cc-ing the dev list. On Mon, Jul 14, 2014 at 12:21 AM, Yanbo Liang yanboha...@gmail.com wrote: Make Catalyst independent of Spark is the goal of Catalyst, maybe need time and evolution. I awared that package org.apache.spark.sql.catalyst.util embraced org.apache.spark.util.{Utils = SparkUtils}, so that Catalyst has a dependency on Spark core. I'm not sure whether it will be replaced by other component independent of Spark in later release. 2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: As per the recent presentation given in Scala days (http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it was mentioned that Catalyst is independent of Spark. But on inspecting pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core. Any particular reason for the dependency? I would love to use Catalyst outside Spark (reposted as previous email bounced. Sorry if this is a duplicate).
Re: the default GraphX graph-partition strategy on multicore machine?
Dear Ankur, Thanks so much! Btw, is there any possibility to customise the partition strategy as we expect? Best, Yifan On Jul 11, 2014, at 10:20 PM, Ankur Dave ankurd...@gmail.com wrote: Hi Yifan, When you run Spark on a single machine, it uses a local mode where one task per core can be executed at at a time -- that is, the level of parallelism is the same as the number of cores. To take advantage of this, when you load a file using sc.textFile, you should set the minPartitions argument to be the number of cores (available from sc.defaultParallelism) or a multiple thereof. This will split up your local edge file and allow you to take advantage of all the machine's cores. Once you've loaded the edge RDD with the appropriate number of partitions and constructed a graph using it, GraphX will leave the edge partitioning alone. During graph computation, each vertex will automatically be copied to the edge partitions where it is needed, and the computation will execute in parallel on each of the edge partitions (cores). If you later call Graph.partitionBy, it will by default preserve the number of edge partitions, but shuffle around the edges according to the partition strategy. This won't change the level of parallelism, but it might decrease the amount of inter-core communication. Hope that helps! By the way, do continue to post your GraphX questions to the Spark user list if possible. I'll probably still be the one answering them, but that way others can benefit as well. Ankur On Fri, Jul 11, 2014 at 3:05 AM, Yifan LI iamyifa...@gmail.com wrote: Hi Ankur, I am doing graph computation using GraphX on a single multicore machine(not a cluster). But It seems that I couldn't find enough docs w.r.t how GraphX partition graph on a multicore machine. Could you give me some introduction or docs? For instance, I have one single edges file(not HDFS, etc), which follows the srcID, dstID, edgeProperties format, maybe 100MB or 500GB on size. and the latest Spark 1.0.0(with GraphX) has been installed on a 64bit, 8*CPU machine. I propose to do my own algorithm application, - as default, how the edges data is partitioned? to each CPU? or to each process? - if later I specify partition strategy in partitionBy(), e.g. PartitionStrategy.EdgePartition2D what will happen? it will work? Thanks in advance! :) Best, Yifan LI Univ. Paris-Sud/ Inria, Paris, France
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Filed SPARK-2446 2014-07-15 16:17 GMT+08:00 Michael Armbrust mich...@databricks.com: Oh, maybe not. Please file another JIRA. On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote: Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Sorry, should be SPARK-2489 2014-07-15 19:22 GMT+08:00 Pei-Lun Lee pl...@appier.com: Filed SPARK-2446 2014-07-15 16:17 GMT+08:00 Michael Armbrust mich...@databricks.com: Oh, maybe not. Please file another JIRA. On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote: Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at
Re: Need help on spark Hbase
Hi Rajesh, can you describe your spark cluster setup? I saw localhost:2181 for zookeeper. Best Regards, Jerry On Tue, Jul 15, 2014 at 9:47 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Could you please help me to resolve the issue. *Issue *: I'm not able to connect HBase from Spark-submit. Below is my code. When i execute below program in standalone, i'm able to connect to Hbase and doing the operation. When i execute below program using spark submit ( ./bin/spark-submit ) command, i'm not able to connect to hbase. Am i missing any thing? import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class Test { public static void main(String[] args) throws Exception { JavaStreamingContext ssc = new JavaStreamingContext(local,Test, new Duration(4), sparkHome, ); JavaDStreamString lines_2 = ssc.textFileStream(hdfsfolderpath); Configuration configuration = HBaseConfiguration.create(); configuration.set(hbase.zookeeper.property.clientPort, 2181); configuration.set(hbase.zookeeper.quorum, localhost); configuration.set(hbase.master, localhost:60); HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration); if (hBaseAdmin.tableExists(HABSE_TABLE)) { System.out.println( ANA_DATA table exists ..); } System.out.println( HELLO HELLO HELLO ); ssc.start(); ssc.awaitTermination(); } } Thank you for your help and support. Regards, Rajesh
Re: Need help on spark Hbase
One vector to check is the HBase libraries in the --jars as in : spark-submit --class your class --master master url --jars hbase-client-0.98.3-hadoop2.jar,commons-csv-1.0-SNAPSHOT.jar,hbase-common-0.98.3-hadoop2.jar,hbase-hadoop2-compat-0.98.3-hadoop2.jar,hbase-it-0.98.3-hadoop2.jar,hbase-protocol-0.98.3-hadoop2.jar,hbase-server-0.98.3-hadoop2.jar,htrace-core-2.04.jar,spark-assembly-1.0.0-hadoop2.2.0.jar badwclient.jar This worked for us. Cheers k/ On Tue, Jul 15, 2014 at 6:47 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Could you please help me to resolve the issue. *Issue *: I'm not able to connect HBase from Spark-submit. Below is my code. When i execute below program in standalone, i'm able to connect to Hbase and doing the operation. When i execute below program using spark submit ( ./bin/spark-submit ) command, i'm not able to connect to hbase. Am i missing any thing? import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class Test { public static void main(String[] args) throws Exception { JavaStreamingContext ssc = new JavaStreamingContext(local,Test, new Duration(4), sparkHome, ); JavaDStreamString lines_2 = ssc.textFileStream(hdfsfolderpath); Configuration configuration = HBaseConfiguration.create(); configuration.set(hbase.zookeeper.property.clientPort, 2181); configuration.set(hbase.zookeeper.quorum, localhost); configuration.set(hbase.master, localhost:60); HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration); if (hBaseAdmin.tableExists(HABSE_TABLE)) { System.out.println( ANA_DATA table exists ..); } System.out.println( HELLO HELLO HELLO ); ssc.start(); ssc.awaitTermination(); } } Thank you for your help and support. Regards, Rajesh
Re: NotSerializableException in Spark Streaming
Hey Diana, Did you ever figure this out? I’m running into the same exception, except in my case the function I’m calling is a KMeans model.predict(). In regular Spark it works, and Spark Streaming without the call to model.predict() also works, but when put together I get this serialization exception. I’m on 1.0.0. Nick On Thu, May 8, 2014 at 6:37 AM, Diana Carroll dcarr...@cloudera.com wrote: Hey all, trying to set up a pretty simple streaming app and getting some weird behavior. First, a non-streaming job that works fine: I'm trying to pull out lines of a log file that match a regex, for which I've set up a function: def getRequestDoc(s: String): String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull } logs=sc.textFile(logfiles) logs.map(getRequestDoc).take(10) That works, but I want to run that on the same data, but streaming, so I tried this: val logs = ssc.socketTextStream(localhost,) logs.map(getRequestDoc).print() ssc.start() From this code, I get: 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job 1399545128000 ms.0 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext But if I do the map function inline instead of calling a separate function, it works: logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print() So why is it able to serialize my little function in regular spark, but not in streaming? Thanks, Diana
Driver cannot receive StatusUpdate message for FINISHED
Hi all, I got a strange problem, I submit a reduce job(any one split), it finished normally on Executor, log is: 14/07/15 21:08:56 INFO Executor: Serialized size of result for 0 is 10476031 14/07/15 21:08:56 INFO Executor: Sending result for 0 directly to driver 14/07/15 21:08:56 INFO Executor: Finished task ID 0 but the driver seems not received this StatusUpdate Message, and the status of this task is still mark as RUNNING, therefore, the job never quit. how this happened? any ideas? My version of spark is 0.9.1 yours sincerely, linwukang
Re: Spark-Streaming collect/take functionality.
It works perfect, thanks!. I feel like I should have figured that out, I'll chalk it up to inexperience with Scala. Thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670p9772.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Need help on spark Hbase
Hi Rajesh, I have a feeling that this is not directly related to spark but I might be wrong. The reason why is that when you do: Configuration configuration = HBaseConfiguration.create(); by default, it reads the configuration files hbase-site.xml in your classpath and ... (I don't remember all the configuration files hbase has). I noticed that you overwrote some configuration settings in the code but I'm not if you have other configurations that might have conflicted with those. Could you try the following, remove anything that is spark specific leaving only hbase related codes. uber jar it and run it just like any other simple java program. If you still have connection issues, then at least you know the problem is from the configurations. HTH, Jerry On Tue, Jul 15, 2014 at 12:10 PM, Krishna Sankar ksanka...@gmail.com wrote: One vector to check is the HBase libraries in the --jars as in : spark-submit --class your class --master master url --jars hbase-client-0.98.3-hadoop2.jar,commons-csv-1.0-SNAPSHOT.jar,hbase-common-0.98.3-hadoop2.jar,hbase-hadoop2-compat-0.98.3-hadoop2.jar,hbase-it-0.98.3-hadoop2.jar,hbase-protocol-0.98.3-hadoop2.jar,hbase-server-0.98.3-hadoop2.jar,htrace-core-2.04.jar,spark-assembly-1.0.0-hadoop2.2.0.jar badwclient.jar This worked for us. Cheers k/ On Tue, Jul 15, 2014 at 6:47 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Could you please help me to resolve the issue. *Issue *: I'm not able to connect HBase from Spark-submit. Below is my code. When i execute below program in standalone, i'm able to connect to Hbase and doing the operation. When i execute below program using spark submit ( ./bin/spark-submit ) command, i'm not able to connect to hbase. Am i missing any thing? import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class Test { public static void main(String[] args) throws Exception { JavaStreamingContext ssc = new JavaStreamingContext(local,Test, new Duration(4), sparkHome, ); JavaDStreamString lines_2 = ssc.textFileStream(hdfsfolderpath); Configuration configuration = HBaseConfiguration.create(); configuration.set(hbase.zookeeper.property.clientPort, 2181); configuration.set(hbase.zookeeper.quorum, localhost); configuration.set(hbase.master, localhost:60); HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration); if (hBaseAdmin.tableExists(HABSE_TABLE)) { System.out.println( ANA_DATA table exists ..); } System.out.println( HELLO HELLO HELLO ); ssc.start(); ssc.awaitTermination(); } } Thank you for your help and support. Regards, Rajesh
Re: Iteration question
Hi Nathan, I think there are two possible reasons for this. One is that even though you are caching RDDs, their lineage chain gets longer and longer, and thus serializing each RDD takes more time. You can cut off the chain by using RDD.checkpoint() periodically, say every 5-10 iterations. The second reason may just be garbage accumulating in the JVM and causing more collection time as you go ahead. Matei On Jul 11, 2014, at 6:54 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Hi, folks. We're having a problem with iteration that I don't understand. We have the following test code: org.apache.log4j.Logger.getLogger(org).setLevel(org.apache.log4j.Level.WARN) org.apache.log4j.Logger.getLogger(akka).setLevel(org.apache.log4j.Level.WARN) def test (caching: Boolean, points: Int, iterations: Int) { var coords = sc.parallelize(Array.fill(points)(0.0, 0.0).zipWithIndex.map(_.swap)) if (caching) coords.cache coords.count var iteration = 0 val times = new Array[Double](iterations) do { val start = System.currentTimeMillis val thisIteration = iteration val increments = sc.parallelize(for (i - 1 to points) yield (math.random, math.random)) val newcoords = coords.zip(increments).map(p = { if (0 == p._1._1) println(Processing iteration +thisIteration) (p._1._1, (p._1._2._1 + p._2._1, p._1._2._2 + p._2._2)) } ) if (caching) newcoords.cache newcoords.count if (caching) coords.unpersist(false) coords = newcoords val end = System.currentTimeMillis times(iteration) = (end-start)/1000.0 println(Done iteration +iteration+ in +times(iteration)+ seconds) iteration = iteration + 1 } while (iteration iterations) for (i - 0 until iterations) { println(Iteration +i+: +times(i)) } } If you run this on a local server with caching on and off, it appears that the caching does what it is supposed to do - only the latest iteration is processed each time through the loop. However, despite this, the time for each iteration still gets slower and slower. For example, calling test(true, 5000, 100), I get the following times (weeding out a few for brevity): Iteration 0: 0.084 Iteration 10: 0.381 Iteration 20: 0.674 Iteration 30: 0.975 Iteration 40: 1.254 Iteration 50: 1.544 Iteration 60: 1.802 Iteration 70: 2.147 Iteration 80: 2.469 Iteration 90: 2.715 Iteration 99: 2.962 That's a 35x increase between the first and last iteration, when it should be doing the same thing each time! Without caching, the nubmers are Iteration 0: 0.642 Iteration 10: 0.516 Iteration 20: 0.823 Iteration 30: 1.17 Iteration 40: 1.514 Iteration 50: 1.655 Iteration 60: 1.992 Iteration 70: 2.177 Iteration 80: 2.472 Iteration 90: 2.814 Iteration 99: 3.018 slightly slower - but not significantly. Does anyone know, if the caching is working, why is iteration 100 slower than iteration 1? And why is caching making so little difference? Thanks, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Store one to many relation ship in parquet file with spark sql
Make the Array a Seq. On Tue, Jul 15, 2014 at 7:12 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, How should I store a one to many relationship using spark sql and parquet format. For example I the following case class case class Person(key: String, name: String, friends: Array[String]) gives an error when I try to insert the data in a parquet file. It doesn't like the Array[String] Any suggestion will be helpfull, Regards, Jao
Error while running Spark SQL join when using Spark 1.0.1
HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994 https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994sa=Dsntz=1usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
How does Spark speculation prevent duplicated work?
Hi all, I was curious about the details of Spark speculation. So, my understanding is that, when ³speculated² tasks are newly scheduled on other machines, the original tasks are still running until the entire stage completes. This seems to leave some room for duplicated work because some spark actions are not idempotent. For example, it may be counting a partition twice in case of RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). How does it prevent this kind of duplicated work? Mingyu smime.p7s Description: S/MIME cryptographic signature
Re: How to kill running spark yarn application
Interesting, I run on my local one node cluster using apache hadoop On Tue, Jul 15, 2014 at 7:55 AM, Jerry Lam chiling...@gmail.com wrote: For your information, the SparkSubmit runs at the host you executed the spark-submit shell script (which in turns invoke the SparkSubmit program). Since you are running in yarn-cluster mode, the SparkSubmit program just reported the status of the job submitted to Yarn. So when you killed the ApplicationMaster, the SparkSubmit will return yarnAppState: KILLED and then terminated itself. This is what happens to me using cdh 5.0.2 Which distribution of hadoop you are using? On Tue, Jul 15, 2014 at 10:42 AM, Jerry Lam chiling...@gmail.com wrote: when I use yarn application -kill, both SparkSubmit and ApplicationMaster are killed. I also checked jps at the machine that has SparkSubmit running, it is terminated as well. Sorry, I cannot reproduce it. On Mon, Jul 14, 2014 at 7:36 PM, hsy...@gmail.com hsy...@gmail.com wrote: Before yarn application -kill If you do jps You'll have a list of SparkSubmit and ApplicationMaster After you use yarn applicaton -kill you only kill the SparkSubmit On Mon, Jul 14, 2014 at 4:29 PM, Jerry Lam chiling...@gmail.com wrote: Then yarn application -kill appid should work. This is what I did 2 hours ago. Sorry I cannot provide more help. Sent from my iPhone On 14 Jul, 2014, at 6:05 pm, hsy...@gmail.com hsy...@gmail.com wrote: yarn-cluster On Mon, Jul 14, 2014 at 2:44 PM, Jerry Lam chiling...@gmail.com wrote: Hi Siyuan, I wonder if you --master yarn-cluster or yarn-client? Best Regards, Jerry On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi all, A newbie question, I start a spark yarn application through spark-submit How do I kill this app. I can kill the yarn app by yarn application -kill appid but the application master is still running. What's the proper way to shutdown the entire app? Best, Siyuan
Spark Performance Bench mark
Hello All, I am a newbie to Apache Spark, I would like to know the performance benchmark of Apache Spark. My current requirement is as follows I have few files in 2 s3 buckets Each file may have minimum of 1 million records. File data are tab separated. Have to compare few columns and filter the records. Right now I am using Hive, it is taking more than 2 days to filter the records. Please find the hive query below INSERT OVERWRITE TABLE cnv_algo3 SELECT * FROM table1 t1 JOIN table2 t2 WHERE unix_timestamp(t2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(t1.time, '-MM-dd HH:mm:ss,SSS') and compare(t1.column1, t1.column2, t2.column1, t2.column4); Here compare is the UDF function. Assume table1 1 has 20 million records and table2 has 5 million records. Let me know how much time Spark will to take filter the records in a standard configuration. It is pretty urgent to take an decision to move the project to use Spark. Hence help me. I highly appreciate your help. I am planning to use --instance-type m1.xlarge --instance-count 3 Thanks and Regards, Malligarjunan S.
MLLib - Regularized logistic regression in python
Hi All, I am trying to perform regularized logistic regression with mllib in python. I have seen that this is possible in the following scala example: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala But I do not see any way to set the regType and regParam when training logistic regression through python. Additionally, I would like to output the activations -- i.e. P(Y=1 | X). Currently, LogisticRegressionModel.predict() just thresholds at 0.5 and does not return the actual probability. Do I just have to do this by hand by grabbing the weights from the trained model, or is there a built in way to do this? Best, Francisco Gimenez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Regularized-logistic-regression-in-python-tp9780.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Count distinct with groupBy usage
Hi -- New to Spark and trying to figure out how to do a generate unique counts per page by date given this raw data: timestamp,page,userId 1405377264,google,user1 1405378589,google,user2 1405380012,yahoo,user1 .. I can do a groupBy a field and get the count: val lines=sc.textFile(data.csv) val csv=lines.map(_.split(,)) // group by page csv.groupBy(_(1)).count But not able to see how to do count distinct on userId and also apply another groupBy on timestamp field. Please let me know how to handle such cases. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running Spark SQL join when using Spark 1.0.1
Are you registering multiple RDDs of case classes as tables concurrently? You are possibly hitting SPARK-2178 https://issues.apache.org/jira/browse/SPARK-2178 which is caused by SI-6240 https://issues.scala-lang.org/browse/SI-6240. On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons keith.simm...@gmail.com wrote: HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994 https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994sa=Dsntz=1usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
Re: Count distinct with groupBy usage
You can use .distinct.count on your user RDD. What are you trying to achieve with the time group by? — Sent from Mailbox On Tue, Jul 15, 2014 at 8:14 PM, buntu buntu...@gmail.com wrote: Hi -- New to Spark and trying to figure out how to do a generate unique counts per page by date given this raw data: timestamp,page,userId 1405377264,google,user1 1405378589,google,user2 1405380012,yahoo,user1 .. I can do a groupBy a field and get the count: val lines=sc.textFile(data.csv) val csv=lines.map(_.split(,)) // group by page csv.groupBy(_(1)).count But not able to see how to do count distinct on userId and also apply another groupBy on timestamp field. Please let me know how to handle such cases. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Count distinct with groupBy usage
Sounds like a job for Spark SQL: http://spark.apache.org/docs/latest/sql-programming-guide.html ! On Tue, Jul 15, 2014 at 11:25 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can use .distinct.count on your user RDD. What are you trying to achieve with the time group by? — Sent from Mailbox On Tue, Jul 15, 2014 at 8:14 PM, buntu buntu...@gmail.com wrote: Hi -- New to Spark and trying to figure out how to do a generate unique counts per page by date given this raw data: timestamp,page,userId 1405377264,google,user1 1405378589,google,user2 1405380012,yahoo,user1 .. I can do a groupBy a field and get the count: val lines=sc.textFile(data.csv) val csv=lines.map(_.split(,)) // group by page csv.groupBy(_(1)).count But not able to see how to do count distinct on userId and also apply another groupBy on timestamp field. Please let me know how to handle such cases. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Performance issue
Hello all, I am a newbie to Spark, Just analyzing the product. I am facing a performance problem with hive, Trying analyse whether the Spark will solve it or not. but it seems that Spark also taking lot of time.Let me know if I miss anything. shark select count(time) from table2; OK 6050 Time taken: 7.571 seconds shark select count(time) from table1; OK 18770 Time taken: 1.802 seconds shark select count(*) from table2 t2 JOIN table1 t1; OK 113558500 Time taken: 40.332 seconds shark select count(*) from table2 t2 JOIN table1 t1 WHERE unix_timestamp(t2.time, '-MM-dd HH:mm:ss,SSS') unix_timestamp(t1.time, '-MM-dd HH:mm:ss,SSS') and testCompare(t1.coulmn1, t1.column2, t2.column1,t2.column2); Note: testCompare is Java UDF function which returns true or false. This query is running more than 1 hour. Is there any issue with this query? Or do i miss anything basic in spark? Thanks and Regards, Malligarjunan S.
Re: Count distinct with groupBy usage
We have CDH 5.0.2 which doesn't include Spark SQL yet and may only be available in CDH 5.1 which is yet to be released. If Spark SQL is the only option then I might need to hack around to add it into the current CDH deployment if thats possible. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781p9787.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Count distinct with groupBy usage
Thanks Nick. All I'm attempting is to report number of unique visitors per page by date. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781p9786.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large Task Size?
Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Count distinct with groupBy usage
If you are counting per time and per page, then you need to group by time and page not just page. Something more like: csv.groupBy(csv = (csv(0),csv(1))) ... This gives a list of users per (time,page). As Nick suggests, then you count the distinct values for each key: ... .mapValues(_.distinct.count) If you can tolerate some approximation, then using countApproxDistinctByKey will be a lot faster. csv.groupBy(csv = (csv(0),csv(1))).countApproxDistinctByKey() On Tue, Jul 15, 2014 at 7:14 PM, buntu buntu...@gmail.com wrote: Hi -- New to Spark and trying to figure out how to do a generate unique counts per page by date given this raw data: timestamp,page,userId 1405377264,google,user1 1405378589,google,user2 1405380012,yahoo,user1 .. I can do a groupBy a field and get the count: val lines=sc.textFile(data.csv) val csv=lines.map(_.split(,)) // group by page csv.groupBy(_(1)).count But not able to see how to do count distinct on userId and also apply another groupBy on timestamp field. Please let me know how to handle such cases. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Repeated data item search with Spark SQL(1.0.1)
Hi guys, Sorry, I'm also interested in this nested json structure. I have a similar SQL in which I need to query a nested field in a json. Does the above query works if it is used with sql(sqlText) assuming the data is coming directly from hdfs via sqlContext.jsonFile? The SPARK-2483 https://issues.apache.org/jira/browse/SPARK-2483 seems to address only HiveQL. Best Regards, Jerry On Tue, Jul 15, 2014 at 3:38 AM, anyweil wei...@gmail.com wrote: Thank you so much for the information, now i have merge the fix of #1411 and seems the HiveSQL works with: SELECT name FROM people WHERE schools[0].time2. But one more question is: Is it possible or planed to support the schools.time format to filter the record that there is an element inside array of schools satisfy time 2? The above requirement should be more general than the schools[0].time2, as we sometime don't know which element in the array should satisfy the condition (we do not know if we should use 0 or 1 or X in the schools[X].time), we only care if there is one satisfy the condition, thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Query-the-nested-JSON-data-With-Spark-SQL-1-0-1-tp9544p9741.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running Spark SQL join when using Spark 1.0.1
FWIW, I am unable to reproduce this using the example program locally. On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com wrote: Nope. All of them are registered from the driver program. However, I think we've found the culprit. If the join column between two tables is not in the same column position in both tables, it triggers what appears to be a bug. For example, this program fails: import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.types._ case class Record(value: String, key: Int) case class Record2(key: Int, value: String) object TestJob { def main(args: Array[String]) { run() } private def run() { val sparkConf = new SparkConf() sparkConf.setAppName(TestJob) sparkConf.set(spark.cores.max, 8) sparkConf.set(spark.storage.memoryFraction, 0.1) sparkConf.set(spark.shuffle.memoryFracton, 0.2) sparkConf.set(spark.executor.memory, 2g) sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar)) sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077) sparkConf.setSparkHome(/home/pulseio/spark/current) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i))) val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i))) rdd1.registerAsTable(rdd1) rdd2.registerAsTable(rdd2) sql(SELECT * FROM rdd1).collect.foreach { row = println(row) } sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row = println(row) } sc.stop() } } If you change the definition of Record and Record2 to the following, it succeeds: case class Record(key: Int, value: String) case class Record2(key: Int, value: String) as does: case class Record(value: String, key: Int) case class Record2(value: String, key: Int) Let me know if you need anymore details. On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust mich...@databricks.com wrote: Are you registering multiple RDDs of case classes as tables concurrently? You are possibly hitting SPARK-2178 which is caused by SI-6240. On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons keith.simm...@gmail.com wrote: HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
count vs countByValue in for/yield
Hello, I am curious about something: val result = for { (dt,evrdd) - evrdds val ct = evrdd.count } yield (dt-ct) works. val result = for { (dt,evrdd) - evrdds val ct = evrdd.countByValue } yield (dt-ct) does not work. I get: 14/07/15 16:46:33 WARN TaskSetManager: Lost TID 0 (task 0.0:0) 14/07/15 16:46:33 WARN TaskSetManager: Loss was due to java.lang.NullPointerException java.lang.NullPointerException at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) What is the difference? Is it in the fact that countByValue passes back a Map and count passes back a Long? Thanks! Ognen
Re: the default GraphX graph-partition strategy on multicore machine?
On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote: Btw, is there any possibility to customise the partition strategy as we expect? I'm not sure I understand. Are you asking about defining a custom http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-how-to-specify-partition-strategy-td9309.html#a9338 partition strategy? On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote: when I load the file using sc.textFile (minPartitions = *16*, PartitionStrategy.RandomVertexCut) The easiest way to load the edge file would actually be to use GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut) . 1) how much data will be loaded into memory? The exact size of the graph (vertices + edges) in memory depends on the graph's structure, the partition function, and the average vertex degree, because each vertex must be replicated to all partitions where it is referenced. It's easier to estimate the size of just the edges, which I did on the mailing list http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-partition-problem-tp6248p6363.html a while ago. To summarize, during graph construction each edge takes 60 bytes, and once the graph is constructed it takes 20 bytes. 2) how many partitions will be stored in memory? Once you call cache() on the graph, all 16 partitions will be stored in memory. You can also tell GraphX to spill them to disk in case of memory pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to GraphLoader.edgeListFile. 3) If the thread/task on each core will read only *one* edge from memory and then compute it at every time? Yes, each task is single-threaded, so it will read the edges in its partition sequentially. 3.1) which edge on memory was determined to read into cache? In theory, each core should independently read the edges in its partition into its own cache. Cache issues should be much less of a concern than in most applications because different tasks (cores) operate on independent partitions; there is no shared-memory parallelism. The tradeoff is heavier reliance on shuffles. 3.2) how are those partitions being scheduled? Spark handles the scheduling. There are details in Section 5.1 of the Spark NSDI paper https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf; in short, tasks are scheduled to run on the same machine as their data partition, and by default each machine can accept at most one task per core. Ankur http://www.ankurdave.com/
Re: getting ClassCastException on collect()
Not sure I can help, but I ran into the same problem. Basically my use case is a that I have a List of strings - which I then convert into a RDD using sc.parallelize(). This RDD is then operated on by the foreach() function. Same as you, I get a runtime exception : java.lang.ClassCastException: cannot assign instance of rickshaw.spark.transformations.AssetFunction to field org.apache.spark.api.java.JavaRDDLike$$anonfun$1.f$14 of type org.apache.spark.api.java.function.VoidFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$1 - -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-ClassCastException-on-collect-tp8054p9798.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How does Spark speculation prevent duplicated work?
Hi Nan, Great digging in -- that makes sense to me for when a job is producing some output handled by Spark like a .count or .distinct or similar. For the other part of the question, I'm also interested in side effects like an HDFS disk write. If one task is writing to an HDFS path and another task starts up, wouldn't it also attempt to write to the same path? How is that de-conflicted? On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Mingyuan, According to my understanding, Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056) This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size this design effectively ensures that the actions are idempotent e.g. the count is implemented as def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum even the task in the partition is duplicately executed, the result put in the array is the same At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished Because, when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask - DAGScheduler.taskEnded in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the partitionid so even the duplicate task invokes a CompletionEvent message, it will find job.finished(rt.outputId) has been true eventually Maybe I was wrong…just went through the code roughly, welcome to correct me Best, -- Nan Zhu On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote: Hi all, I was curious about the details of Spark speculation. So, my understanding is that, when “speculated” tasks are newly scheduled on other machines, the original tasks are still running until the entire stage completes. This seems to leave some room for duplicated work because some spark actions are not idempotent. For example, it may be counting a partition twice in case of RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). How does it prevent this kind of duplicated work? Mingyu Attachments: - smime.p7s
parallel stages?
Hi, I wonder if I do wordcount on two different files, like this: val file1 = sc.textFile(/...) val file2 = sc.textFile(/...) val wc1= file.flatMap(..).reduceByKey(_ + _,1) val wc2= file.flatMap(...).reduceByKey(_ + _,1) wc1.saveAsTextFile(titles.out) wc2.saveAsTextFile(tables.out) Would the two reduceByKey stages run in parallel given sufficient capacity? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: Error while running Spark SQL join when using Spark 1.0.1
To give a few more details of my environment in case that helps you reproduce: * I'm running spark 1.0.1 downloaded as a tar ball, not built myself * I'm running in stand alone mode, with 1 master and 1 worker, both on the same machine (though the same error occurs with two workers on two machines) * I'm using spark-core and spark-sql 1.0.1 pulled via maven Here's my built.sbt: name := spark-test version := 1.0 scalaVersion := 2.10.4 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/; libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 % provided libraryDependencies += org.apache.spark %% spark-core % 1.0.1 % provided On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com wrote: FWIW, I am unable to reproduce this using the example program locally. On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com wrote: Nope. All of them are registered from the driver program. However, I think we've found the culprit. If the join column between two tables is not in the same column position in both tables, it triggers what appears to be a bug. For example, this program fails: import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.types._ case class Record(value: String, key: Int) case class Record2(key: Int, value: String) object TestJob { def main(args: Array[String]) { run() } private def run() { val sparkConf = new SparkConf() sparkConf.setAppName(TestJob) sparkConf.set(spark.cores.max, 8) sparkConf.set(spark.storage.memoryFraction, 0.1) sparkConf.set(spark.shuffle.memoryFracton, 0.2) sparkConf.set(spark.executor.memory, 2g) sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar)) sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077) sparkConf.setSparkHome(/home/pulseio/spark/current) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i))) val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i))) rdd1.registerAsTable(rdd1) rdd2.registerAsTable(rdd2) sql(SELECT * FROM rdd1).collect.foreach { row = println(row) } sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row = println(row) } sc.stop() } } If you change the definition of Record and Record2 to the following, it succeeds: case class Record(key: Int, value: String) case class Record2(key: Int, value: String) as does: case class Record(value: String, key: Int) case class Record2(value: String, key: Int) Let me know if you need anymore details. On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust mich...@databricks.com wrote: Are you registering multiple RDDs of case classes as tables concurrently? You are possibly hitting SPARK-2178 which is caused by SI-6240. On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons keith.simm...@gmail.com wrote: HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
Re: Count distinct with groupBy usage
Thanks Sean!! Thats what I was looking for -- group by on mulitple fields. I'm gonna play with it now. Thanks again! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781p9803.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Possible bug in Spark Streaming :: TextFileStream
On second thought I am not entirely sure whether that bug is the issue. Are you continuously appending to the file that you have copied to the directory? Because filestream works correctly when the files are atomically moved to the monitored directory. TD On Mon, Jul 14, 2014 at 9:08 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Is this issue with JavaStreamingContext.textFileStream(hdfsfolderpath) API also? Please conform. If yes, could you please help me to fix this issue. I'm using spark 1.0.0 version. Regards, Rajesh On Tue, Jul 15, 2014 at 5:42 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Oh yes, this was a bug and it has been fixed. Checkout from the master branch! https://issues.apache.org/jira/browse/SPARK-2362?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20created%20DESC%2C%20priority%20ASC TD On Mon, Jul 7, 2014 at 7:11 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I have a basic spark streaming job that is watching a folder, processing any new file and updating a column family in cassandra using the new cassandra-spark-driver. I think there is a problem with SparkStreamingContext.textFileStream... if I start my job in local mode with no files in the folder that is watched and then I copy a bunch of files, sometimes spark is continually processing those files again and again. I have noticed that it usually happens when spark doesn't detect all new files in one go... i.e. I copied 6 files and spark detected 3 of them as new and processed them; then it detected the other 3 as new and processed them. After it finished to process all 6 files, it detected again the first 3 files as new files and processed them... then the other 3... and again... and again... and again. Should I rise a JIRA issue? Regards, Luis
Re: SQL + streaming
Could you run it locally first to make sure it works, and you see output? Also, I recommend going through the previous step-by-step approach to narrow down where the problem is. TD On Mon, Jul 14, 2014 at 9:15 PM, hsy...@gmail.com hsy...@gmail.com wrote: Actually, I deployed this on yarn cluster(spark-submit) and I couldn't find any output from the yarn stdout logs On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you make sure you are running locally on more than 1 local cores? You could set the master in the SparkConf as conf.setMaster(local[4]). Then see if there are jobs running on every batch of data in the Spark web ui (running on localhost:4040). If you still dont get any output, try first simple printing recRDD.count() in the foreachRDD (that is, first test spark streaming). If you can get that to work, then I would test the Spark SQL stuff. TD On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com hsy...@gmail.com wrote: No errors but no output either... Thanks! On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Could you elaborate on what is the problem you are facing? Compiler error? Runtime error? Class-not-found error? Not receiving any data from Kafka? Receiving data but SQL command throwing error? No errors but no output either? TD On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much! object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length 4) { System.err.println(Usage: KafkaSpark zkQuorum group topics numThreads) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); //ssc.checkpoint(checkpoint) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(1) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t = getRecord(t._2.split(#))) val result = recordsStream.foreachRDD((recRDD, tt)={ recRDD.registerAsTable(records) val result = sql(select * from records) println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println(Getting the record) Record(l(0), l(1))} }
Re: Spark Streaming Json file groupby function
I see you have the code to convert to Record class but commented it out. That is the right way to go. When you are converting it to a 4-tuple with (data(type),data(name),data(score),data(school)) ... its of type (Any, Any, Any, Any) as data(xyz) returns Any. And registerAsTable probably doesnt work well with Any as the columns. @michael any insights? TD On Mon, Jul 14, 2014 at 10:07 PM, srinivas kusamsrini...@gmail.com wrote: Hi TD, Thanks for ur help...i am able to convert map to records using case class. I am left with doing some aggregations. I am trying to do some SQL type operations on my records set. My code looks like case class Record(ID:Int,name:String,score:Int,school:String) //val records = jsonf.map(m = Record(m(0),m(1),m(2),m(3))) val fields = jsonf.map(data = (data(type),data(name),data(score),data(school))) val results = fields.transform((rdd,time) = { rdd.registerAsTable(table1) sqlc.sql(select * from table1) }) when i am trying to compile my code it giving me jsonfile.scala:30: value registerAsTable is not a member of org.apache.spark.rdd.RDD[(Any, Any, Any, Any)] Please let me know if i am missing any thing. And using Spark Streaming can i really use sql kind of operations on Dstreams? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9714.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Help with Json array parsing
Hi, I have a json file where the object definition in each line includes an array component obj that contains 0 or more elements as shown by the example below. {name: 16287e9cdf, obj: [{min: 50,max: 59 }, {min: 20, max: 29}]}, {name: 17087e9cdf, obj: [{min: 30,max: 39 }, {min: 10, max: 19}, {min: 60, max: 69}]}, {name: 18287e0cdf} I need to extract all the min values from the JSON definition in each line for further processing. I used the following Spark code to parse the file and extract the min fields, but I am getting a runtime error. I would like to know what is the right way to extract the 0 or more min values from the array above. val inp = sc.textFile(args(0)) val res = inp.map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val name = (json \ name).extract[String] val min_vals = (json \ obj \ min).extract[Array[Int]] (name, min_vals) } ) Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-Json-array-parsing-tp9807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: truly bizarre behavior with local[n] on Spark 1.0.1
This sounds really really weird. Can you give me a piece of code that I can run to reproduce this issue myself? TD On Tue, Jul 15, 2014 at 12:02 AM, Walrus theCat walrusthe...@gmail.com wrote: This is (obviously) spark streaming, by the way. On Mon, Jul 14, 2014 at 8:27 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
Re: Error while running Spark SQL join when using Spark 1.0.1
Can you print out the queryExecution? (i.e. println(sql().queryExecution)) On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons keith.simm...@gmail.com wrote: To give a few more details of my environment in case that helps you reproduce: * I'm running spark 1.0.1 downloaded as a tar ball, not built myself * I'm running in stand alone mode, with 1 master and 1 worker, both on the same machine (though the same error occurs with two workers on two machines) * I'm using spark-core and spark-sql 1.0.1 pulled via maven Here's my built.sbt: name := spark-test version := 1.0 scalaVersion := 2.10.4 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/; libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 % provided libraryDependencies += org.apache.spark %% spark-core % 1.0.1 % provided On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com wrote: FWIW, I am unable to reproduce this using the example program locally. On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com wrote: Nope. All of them are registered from the driver program. However, I think we've found the culprit. If the join column between two tables is not in the same column position in both tables, it triggers what appears to be a bug. For example, this program fails: import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.types._ case class Record(value: String, key: Int) case class Record2(key: Int, value: String) object TestJob { def main(args: Array[String]) { run() } private def run() { val sparkConf = new SparkConf() sparkConf.setAppName(TestJob) sparkConf.set(spark.cores.max, 8) sparkConf.set(spark.storage.memoryFraction, 0.1) sparkConf.set(spark.shuffle.memoryFracton, 0.2) sparkConf.set(spark.executor.memory, 2g) sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar)) sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077) sparkConf.setSparkHome(/home/pulseio/spark/current) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i))) val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i))) rdd1.registerAsTable(rdd1) rdd2.registerAsTable(rdd2) sql(SELECT * FROM rdd1).collect.foreach { row = println(row) } sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row = println(row) } sc.stop() } } If you change the definition of Record and Record2 to the following, it succeeds: case class Record(key: Int, value: String) case class Record2(key: Int, value: String) as does: case class Record(value: String, key: Int) case class Record2(value: String, key: Int) Let me know if you need anymore details. On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust mich...@databricks.com wrote: Are you registering multiple RDDs of case classes as tables concurrently? You are possibly hitting SPARK-2178 which is caused by SI-6240. On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons keith.simm...@gmail.com wrote: HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
Re: Ideal core count within a single JVM
It makes sense what you said. But, when I proportionately reduce the heap size, then also the problem persists. For instance, if I use 160 GB heap for 48 cores, whereas 80 GB heap for 24 cores, then also with 24 cores the performance is better (although worse than 160 GB with 24 cores) than 48-core case. It is only when I use 40 GB with 24 cores that I see 48-core case performing better than 24-core case. Does it mean that there is no relation between thread count and heap size? If so, can you please suggest how can I calculate heap sizes for fair comparisons? My real trouble is when I compare performance of application when run with (1) a single node of 48 cores and 160GB heap, and (2) 8 nodes of 6 cores and 20GB each. In this comparison (2) performs far better than (1), and I don't understand the reason. Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9810.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: truly bizarre behavior with local[n] on Spark 1.0.1
Will do. On Tue, Jul 15, 2014 at 12:56 PM, Tathagata Das tathagata.das1...@gmail.com wrote: This sounds really really weird. Can you give me a piece of code that I can run to reproduce this issue myself? TD On Tue, Jul 15, 2014 at 12:02 AM, Walrus theCat walrusthe...@gmail.com wrote: This is (obviously) spark streaming, by the way. On Mon, Jul 14, 2014 at 8:27 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
Re: How does Spark speculation prevent duplicated work?
I haven't look at the implementation but what you would do with any filesystem is write to a file inside the workspace directory of the task. And then only the attempt of the task that should be kept will perform a move to the final path. The other attempts are simply discarded. For most filesystem (and that's the case for HDFS), a 'move' is a very simple and fast action because only the full path/name of the file change but not its content or where this content is physically stored. Executive speculation happens in Hadoop MapReduce. Spark has the same concept. As long as you apply functions with no side effect (ie the only impact is the returned results), then you just need to not take into account results from additional attempts of the same task/operator. Bertrand Dechoux On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash and...@andrewash.com wrote: Hi Nan, Great digging in -- that makes sense to me for when a job is producing some output handled by Spark like a .count or .distinct or similar. For the other part of the question, I'm also interested in side effects like an HDFS disk write. If one task is writing to an HDFS path and another task starts up, wouldn't it also attempt to write to the same path? How is that de-conflicted? On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Mingyuan, According to my understanding, Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056) This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size this design effectively ensures that the actions are idempotent e.g. the count is implemented as def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum even the task in the partition is duplicately executed, the result put in the array is the same At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished Because, when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask - DAGScheduler.taskEnded in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the partitionid so even the duplicate task invokes a CompletionEvent message, it will find job.finished(rt.outputId) has been true eventually Maybe I was wrong…just went through the code roughly, welcome to correct me Best, -- Nan Zhu On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote: Hi all, I was curious about the details of Spark speculation. So, my understanding is that, when “speculated” tasks are newly scheduled on other machines, the original tasks are still running until the entire stage completes. This seems to leave some room for duplicated work because some spark actions are not idempotent. For example, it may be counting a partition twice in case of RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). How does it prevent this kind of duplicated work? Mingyu Attachments: - smime.p7s
Re: Error while running Spark SQL join when using Spark 1.0.1
Sure thing. Here you go: == Logical Plan == Sort [key#0 ASC] Project [key#0,value#1,value#2] Join Inner, Some((key#0 = key#3)) SparkLogicalPlan (ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at basicOperators.scala:176) SparkLogicalPlan (ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176) == Optimized Logical Plan == Sort [key#0 ASC] Project [key#0,value#1,value#2] Join Inner, Some((key#0 = key#3)) SparkLogicalPlan (ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at basicOperators.scala:176) SparkLogicalPlan (ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176) == Physical Plan == Sort [key#0:0 ASC], true Exchange (RangePartitioning [key#0 ASC], 200) Project [key#0:0,value#1:1,value#2:2] HashJoin [key#0], [key#3], BuildRight Exchange (HashPartitioning [key#0], 200) ExistingRdd [key#0,value#1], MapPartitionsRDD[2] at mapPartitions at basicOperators.scala:176 Exchange (HashPartitioning [key#3], 200) ExistingRdd [value#2,key#3], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176 Also, in case it's helpful, here's the entire stack trace: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.IntegerType$ at java.io.ObjectStreamClass.hasStaticInitializer(Native Method) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1730) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:225) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:222) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:588) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1516) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1001) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1892) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1914) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1797) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at
Re: parallel stages?
The last two lines are what trigger the operations, and they will each block until the result is computed and saved. So if you execute this code as-is, no. You could write a Scala program that invokes these two operations in parallel, like: Array((wc1,titles.out), (wc2,tables.out)).par.foreach { case (wc,path) = wc.saveAsTestFile(path) } It worked for me and think it's OK to do this if you know you want to. On Tue, Jul 15, 2014 at 8:38 PM, Wei Tan w...@us.ibm.com wrote: Hi, I wonder if I do wordcount on two different files, like this: val file1 = sc.textFile(/...) val file2 = sc.textFile(/...) val wc1= file.flatMap(..).reduceByKey(_ + _,1) val wc2= file.flatMap(...).reduceByKey(_ + _,1) wc1.saveAsTextFile(titles.out) wc2.saveAsTextFile(tables.out) Would the two reduceByKey stages run in parallel given sufficient capacity? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: Spark Streaming Json file groupby function
I am still getting the error...even if i convert it to record object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println(Usage: KafkaWordCount zkQuorum group topics numThreads) System.exit(1) } //StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(1) // ssc.checkpoint(checkpoint) //import sqlContext._ val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) case class Record(ID:String,name:String,score:String,school:String) val fields = jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString)) val results = fields.transform((recrdd,tt) = { recrdd.registerAsTable(table1) val results = sql(select * from table1) println(results) results.foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } I am getting error [error] /home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record] [error] recrdd.registerAsTable(table1) [error] ^ [error] one error found [error] (compile:compile) Compilation failed Please look into this and let me know if i am missing any thing. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running Spark SQL join when using Spark 1.0.1
Just my few cents on this. I having the same problems with v 1.0.1 but this bug is sporadic and looks like is relayed to object initialization. Even more, i'm not using any SQL or something. I just have utility class like this: object DataTypeDescriptor { type DataType = String val BOOLEAN = BOOLEAN val STRING = STRING val TIMESTAMP = TIMESTAMP val LONG = LONG val INT = INT val SHORT = SHORT val BYTE = BYTE val DECIMAL = DECIMAL val DOUBLE = DOUBLE val FLOAT = FLOAT def $$(name: String, format: Option[String] = None) = DataTypeDescriptor(name, format) private lazy val nativeTypes: Map[String, NativeType] = Map( BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP - TimestampType, LONG - LongType, INT - IntegerType, SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType, DOUBLE - DoubleType, FLOAT - FloatType ) lazy val defaultValues: Map[String, Any] = Map( BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT - 0, SHORT - 0.toShort, BYTE - 0.toByte, DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f ) def apply(dataType: String): DataTypeDescriptor = { DataTypeDescriptor(dataType.toUpperCase, None) } def apply(dataType: SparkDataType): DataTypeDescriptor = { nativeTypes .find { case (_, descriptor) = descriptor == dataType } .map { case (name, descriptor) = DataTypeDescriptor(name, None) } .get } . and some test that check each of this methods. The problem is that this test fails randomly with this error. P.S.: I did not have this problem in Spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Multiple streams at the same time
Hi everyone. I have some problems running multiple streams at the same time. What i am doing is: object Test { import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.api.java.function._ import org.apache.spark.streaming._ import org.apache.spark.streaming.api._ def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setMaster(local[10]).setAppName(test)) val task1 = startListening(sc, Seconds(5)) val task2 = startListening(sc, Seconds(5)) val task3 = startListening(sc, Seconds(5)) val task4 = startListening(sc, Seconds(5)) Thread.sleep(1) println(Killing...) task1.stop() task2.stop() task3.stop() task4.stop() } private def startListening(sc: SparkContext, duration: Duration): StreamingTask = { val ssc = new StreamingContext(sc, duration) val lines = ssc.socketTextStream(localhost, ) lines.foreachRDD { rdd = println(rdd.collect().mkString()) } ssc.start() StreamingTask(ssc) } } case class StreamingTask(ssc: StreamingContext) { def stop() { ssc.stop(stopSparkContext = false, stopGracefully = false) } } The idea is that i am sharing the same SparkContext between different Streaming contexts. What i am getting is: Exception in thread main akka.actor.InvalidActorNameException: actor name [JobScheduler] is not unique! at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77) at akka.actor.ActorCell.reserveChild(ActorCell.scala:338) at akka.actor.dungeon.Children$class.makeChild(Children.scala:186) at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) at akka.actor.ActorCell.attachChild(ActorCell.scala:338) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:57) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:438) at io.ubix.spark.Test$.startListening(Test.scala:38) at io.ubix.spark.Test$.main(Test.scala:20) at io.ubix.spark.Test.main(Test.scala) And this is true because we are trying to create 4 JobScheduler's actors with the same name. From other hand - the question is how to deal with this kind of situations? Am i doing something wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Help with Json array parsing
To add to my previous post, the error at runtime is teh following: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.json4s.package$MappingException: Expected collection but got JInt(20) for root JInt(20) and mapping int[][int, int] thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-Json-array-parsing-tp9807p9820.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Retrieve dataset of Big Data Benchmark
Hi, I would like to use the dataset used in the Big Data Benchmark https://amplab.cs.berkeley.edu/benchmark/ on my own cluster, to run some tests between Hadoop and Spark. The dataset should be available at s3n://big-data-benchmark/pavlo/[text|text-deflate|sequence|sequence-snappy]/[suffix], in the amazon cluster. Is there a way I can download this without being a user of the Amazon cluster? I tried bin/hadoop distcp s3n://123:456@big-data-benchmark/pavlo/text/tiny/* ./ but it asks for an AWS Access Key ID and Secret Access Key which I do not have. Thanks in advance, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Retrieve dataset of Big Data Benchmark
Hi Tom, If you wish to load the file in Spark directly, you can use sc.textFile(s3n://big-data-benchmark/pavlo/...) where sc is your SparkContext. This can be done because the files should be publicly available and you don't need AWS Credentials to access them. If you want to download the file on your local drive: you can use the link http://s3.amazonaws.com/big-data-benchmark/pavlo/... One note though, the tiny dataset doesn't seem to exist anymore. You can look at http://s3.amazonaws.com/big-data-benchmark/ to see the available files. ctrl+f tiny returned 0 matches. Best, Burak - Original Message - From: Tom thubregt...@gmail.com To: u...@spark.incubator.apache.org Sent: Tuesday, July 15, 2014 2:10:15 PM Subject: Retrieve dataset of Big Data Benchmark Hi, I would like to use the dataset used in the Big Data Benchmark https://amplab.cs.berkeley.edu/benchmark/ on my own cluster, to run some tests between Hadoop and Spark. The dataset should be available at s3n://big-data-benchmark/pavlo/[text|text-deflate|sequence|sequence-snappy]/[suffix], in the amazon cluster. Is there a way I can download this without being a user of the Amazon cluster? I tried bin/hadoop distcp s3n://123:456@big-data-benchmark/pavlo/text/tiny/* ./ but it asks for an AWS Access Key ID and Secret Access Key which I do not have. Thanks in advance, Tom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Recommended pipeline automation tool? Oozie?
If you're already using Scala for Spark programming and you hate Oozie XML as much as I do ;), you might check out Scoozie, a Scala DSL for Oozie: https://github.com/klout/scoozie On Thu, Jul 10, 2014 at 5:52 PM, Andrei faithlessfri...@gmail.com wrote: I used both - Oozie and Luigi - but found them inflexible and still overcomplicated, especially in presence of Spark. Oozie has a fixed list of building blocks, which is pretty limiting. For example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out of scope (of course, you can always write wrapper as Java or Shell action, but does it really need to be so complicated?). Another issue with Oozie is passing variables between actions. There's Oozie context that is suitable for passing key-value pairs (both strings) between actions, but for more complex objects (say, FileInputStream that should be closed at last step only) you have to do some advanced kung fu. Luigi, on other hand, has its niche - complicated dataflows with many tasks that depend on each other. Basically, there are tasks (this is where you define computations) and targets (something that can exist - file on disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a plan for achieving this. Luigi is really shiny when your workflow fits this model, but one step away and you are in trouble. For example, consider simple pipeline: run MR job and output temporary data, run another MR job and output final data, clean temporary data. You can make target Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do you check that Clean task is achieved? If you just test whether temporary directory is empty or not, you catch both cases - when all tasks are done and when they are not even started yet. Luigi allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire idea. And of course, both of these frameworks are optimized for standard MapReduce jobs, which is probably not what you want on Spark mailing list :) Experience with these frameworks, however, gave me some insights about typical data pipelines. 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks allow branching, but most pipelines actually consist of moving data from source to destination with possibly some transformations in between (I'll be glad if somebody share use cases when you really need branching). 2. Transactional logic is important. Either everything, or nothing. Otherwise it's really easy to get into inconsistent state. 3. Extensibility is important. You never know what will need in a week or two. So eventually I decided that it is much easier to create your own pipeline instead of trying to adopt your code to existing frameworks. My latest pipeline incarnation simply consists of a list of steps that are started sequentially. Each step is a class with at least these methods: * run() - launch this step * fail() - what to do if step fails * finalize() - (optional) what to do when all steps are done For example, if you want to add possibility to run Spark jobs, you just create SparkStep and configure it with required code. If you want Hive query - just create HiveStep and configure it with Hive connection settings. I use YAML file to configure steps and Context (basically, Map[String, Any]) to pass variables between them. I also use configurable Reporter available for all steps to report the progress. Hopefully, this will give you some insights about best pipeline for your specific case. On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote: We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote: I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
can't get jobs to run on cluster (enough memory and cpus are available on worker)
Hello spark folks, I have a simple spark cluster setup but I can't get jobs to run on it. I am using the standlone mode. One master, one slave. Both machines have 32GB ram and 8 cores. The slave is setup with one worker that has 8 cores and 24GB memory allocated. My application requires 2 cores and 5GB of memory. However, I'm getting the following error: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory What else should I check for? This is a simplified setup (the real cluster has 20 nodes). In this simplified setup I am running the master and the slave manually. The master's web page shows the worker and it shows the application and the memory/core requirements match what I mentioned above. I also tried running the SparkPi example via bin/run-example and get the same result. It requires 8 cores and 512MB of memory, which is also clearly within the limits of the available worker. Any ideas would be greatly appreciated!! Matt
Re: Large Task Size?
Yes, this is a proposed patch to MLLib so that you can use 1 RDD to train multiple models at the same time. I am hoping that by multiplexing several models in the same RDD will be more efficient then trying to get the Spark scheduler to manage a few 100 tasks simultaneously. I don't think I see stochasticLossHistory being included in the closure (please correct me if I'm wrong). Its used once on line 183 to capture the loss sums (a local operation on the results of a 'collect' call), and again on line 198 to update weightSet, but that's after the loop completes, and the memory blow definitely happens before then. Kyle On Tue, Jul 15, 2014 at 12:00 PM, Aaron Davidson ilike...@gmail.com wrote: Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
Have you looked at the slave machine to see if the process has actually launched? If it has, have you tried peeking into its log file? (That error is printed whenever the executors fail to report back to the driver. Insufficient resources to launch the executor is the most common cause of that, but not the only one.) On Tue, Jul 15, 2014 at 2:43 PM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: Hello spark folks, I have a simple spark cluster setup but I can't get jobs to run on it. I am using the standlone mode. One master, one slave. Both machines have 32GB ram and 8 cores. The slave is setup with one worker that has 8 cores and 24GB memory allocated. My application requires 2 cores and 5GB of memory. However, I'm getting the following error: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory What else should I check for? This is a simplified setup (the real cluster has 20 nodes). In this simplified setup I am running the master and the slave manually. The master's web page shows the worker and it shows the application and the memory/core requirements match what I mentioned above. I also tried running the SparkPi example via bin/run-example and get the same result. It requires 8 cores and 512MB of memory, which is also clearly within the limits of the available worker. Any ideas would be greatly appreciated!! Matt -- Marcelo
Re: How does Spark speculation prevent duplicated work?
The way the HDFS file writing works at a high level is that each attempt to write a partition to a file starts writing to unique temporary file (say, something like targetDirectory/_temp/part-X_attempt-). If the writing into the file successfully completes, then the temporary file is moved to the final location (say, targetDirectory/part-X). If, due to speculative execution, the file already exists in the final intended location, then move is avoided. Or, its overwritten, I forget the implementation. Either ways, all attempts to write the same partition, will always write the same data to the temp file (assuming the spark transformation generating the data is deterministic and idempotent). And once one attempt is successful, the final file will have the same data. Hence, writing to HDFS / S3 is idempotent. Now this logic is already implemented within the Hadoop's MapReduce logic, and Spark just uses it directly. TD On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim m...@palantir.com wrote: Thanks for the explanation, guys. I looked into the saveAsHadoopFile implementation a little bit. If you see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala at line 843, the HDFS write happens at per-partition processing, not at the result handling, so I have a feeling that it might be writing multiple times. This may be fine if both tasks for the same partition completes because it will simply overwrite the output partition with the same content, but this could be an issue if one of the tasks completes and the other is in the middle of writing the partition by the time the entire stage completes. Can someone explain this? Bertrand, I’m slightly confused about your comment. So, is it the case that HDFS will handle the writes as a temp file write followed by an atomic move, so the concern I had above is handled at the HDFS level? Mingyu From: Bertrand Dechoux decho...@gmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Tuesday, July 15, 2014 at 1:22 PM To: user@spark.apache.org user@spark.apache.org Subject: Re: How does Spark speculation prevent duplicated work? I haven't look at the implementation but what you would do with any filesystem is write to a file inside the workspace directory of the task. And then only the attempt of the task that should be kept will perform a move to the final path. The other attempts are simply discarded. For most filesystem (and that's the case for HDFS), a 'move' is a very simple and fast action because only the full path/name of the file change but not its content or where this content is physically stored. Executive speculation happens in Hadoop MapReduce. Spark has the same concept. As long as you apply functions with no side effect (ie the only impact is the returned results), then you just need to not take into account results from additional attempts of the same task/operator. Bertrand Dechoux On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash and...@andrewash.com wrote: Hi Nan, Great digging in -- that makes sense to me for when a job is producing some output handled by Spark like a .count or .distinct or similar. For the other part of the question, I'm also interested in side effects like an HDFS disk write. If one task is writing to an HDFS path and another task starts up, wouldn't it also attempt to write to the same path? How is that de-conflicted? On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Mingyuan, According to my understanding, Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056) This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size this design effectively ensures that the actions are idempotent e.g. the count is implemented as def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum even the task in the partition is duplicately executed, the result put in the array is the same At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished Because, when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask - DAGScheduler.taskEnded in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the partitionid so even the duplicate task invokes a CompletionEvent message, it will find job.finished(rt.outputId) has been true eventually Maybe I was wrong…just went through the code roughly, welcome to correct me Best, -- Nan Zhu On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote: Hi all, I was curious about the details of Spark speculation. So, my understanding is that, when
Re: Error when testing with large sparse svm
I got a bit progress. I think the problem is with the BinaryClassificationMetrics, as long as I comment out all the prediction related metrics, I can run the svm example with my data. So the problem should be there I guess. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9832.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NotSerializableException in Spark Streaming
I am very curious though. Can you post a concise code example which we can run to reproduce this problem? TD On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am not entire sure off the top of my head. But a possible (usually works) workaround is to define the function as a val instead of a def. For example def func(i: Int): Boolean = { true } can be written as val func = (i: Int) = { true } Hope this helps for now. TD On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey Diana, Did you ever figure this out? I’m running into the same exception, except in my case the function I’m calling is a KMeans model.predict(). In regular Spark it works, and Spark Streaming without the call to model.predict() also works, but when put together I get this serialization exception. I’m on 1.0.0. Nick On Thu, May 8, 2014 at 6:37 AM, Diana Carroll dcarr...@cloudera.com wrote: Hey all, trying to set up a pretty simple streaming app and getting some weird behavior. First, a non-streaming job that works fine: I'm trying to pull out lines of a log file that match a regex, for which I've set up a function: def getRequestDoc(s: String): String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull } logs=sc.textFile(logfiles) logs.map(getRequestDoc).take(10) That works, but I want to run that on the same data, but streaming, so I tried this: val logs = ssc.socketTextStream(localhost,) logs.map(getRequestDoc).print() ssc.start() From this code, I get: 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job 1399545128000 ms.0 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext But if I do the map function inline instead of calling a separate function, it works: logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print() So why is it able to serialize my little function in regular spark, but not in streaming? Thanks, Diana
Spark misconfigured? Small input split sizes in shark query
Got a spark/shark cluster up and running recently, and have been kicking the tires on it. However, been wrestling with an issue on it that I'm not quite sure how to solve. (Or, at least, not quite sure about the correct way to solve it.) I ran a simple Hive query (select count ...) against a dataset of .tsv files stored in S3, and then ran the same query on shark for comparison. But the shark query took 3x as long. After a bit of digging, I was able to find out what was happening: apparently with the hive query each map task was reading an input split consisting of 2 entire files from the dataset (approximately 180MB each), while with shark each task was reading an input split consisting of a 64MB chunk from one of the files. This made sense: since the shark query had to open each S3 file 3 separate times (and had to run 3x as many tasks) it made sense that it took much longer. After much experimentation I was finally able to work around this issue by overriding the value of mapreduce.input.fileinputformat.split.minsize in my hive-site.xml file. (Bumping it up to 512MB.) However, I'm feeling like this isn't really the right way to solve the issue: a) That parm is normally set to 1. It doesn't seem right that I should need to override it - or set it to a value as large as 512MB. b) We only seem to experience this issue on an existing Hadoop cluster that we've deployed spark/shark onto. When we run the same query on a new cluster launched via the spark ec2 scripts, the number of splits seems to get calculated correctly - without the need for overriding that parm. This leads me to believe we may just have something misconfigured on our existing cluster. c) This seems like an error prone way to overcome this issue. 512MB is an arbitrary value, and should I happen to be running a query against files that are larger than 512MB, I'll again run into the chunking issue. So my gut tells me there's a better way to solve this issue - i.e., somehow configuring spark so that the input splits it generates won't chunk the input files. Anyone know how to accomplish this / what I might have misconfigured? Thanks, DR
Re: How does Spark speculation prevent duplicated work?
Yeah, this is handled by the commit call of the FileOutputFormat. In general Hadoop OutputFormats have a concept called committing the output, which you should do only once per partition. In the file ones it does an atomic rename to make sure that the final output is a complete file. Matei On Jul 15, 2014, at 2:49 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The way the HDFS file writing works at a high level is that each attempt to write a partition to a file starts writing to unique temporary file (say, something like targetDirectory/_temp/part-X_attempt-). If the writing into the file successfully completes, then the temporary file is moved to the final location (say, targetDirectory/part-X). If, due to speculative execution, the file already exists in the final intended location, then move is avoided. Or, its overwritten, I forget the implementation. Either ways, all attempts to write the same partition, will always write the same data to the temp file (assuming the spark transformation generating the data is deterministic and idempotent). And once one attempt is successful, the final file will have the same data. Hence, writing to HDFS / S3 is idempotent. Now this logic is already implemented within the Hadoop's MapReduce logic, and Spark just uses it directly. TD On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim m...@palantir.com wrote: Thanks for the explanation, guys. I looked into the saveAsHadoopFile implementation a little bit. If you see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala at line 843, the HDFS write happens at per-partition processing, not at the result handling, so I have a feeling that it might be writing multiple times. This may be fine if both tasks for the same partition completes because it will simply overwrite the output partition with the same content, but this could be an issue if one of the tasks completes and the other is in the middle of writing the partition by the time the entire stage completes. Can someone explain this? Bertrand, I’m slightly confused about your comment. So, is it the case that HDFS will handle the writes as a temp file write followed by an atomic move, so the concern I had above is handled at the HDFS level? Mingyu From: Bertrand Dechoux decho...@gmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Tuesday, July 15, 2014 at 1:22 PM To: user@spark.apache.org user@spark.apache.org Subject: Re: How does Spark speculation prevent duplicated work? I haven't look at the implementation but what you would do with any filesystem is write to a file inside the workspace directory of the task. And then only the attempt of the task that should be kept will perform a move to the final path. The other attempts are simply discarded. For most filesystem (and that's the case for HDFS), a 'move' is a very simple and fast action because only the full path/name of the file change but not its content or where this content is physically stored. Executive speculation happens in Hadoop MapReduce. Spark has the same concept. As long as you apply functions with no side effect (ie the only impact is the returned results), then you just need to not take into account results from additional attempts of the same task/operator. Bertrand Dechoux On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash and...@andrewash.com wrote: Hi Nan, Great digging in -- that makes sense to me for when a job is producing some output handled by Spark like a .count or .distinct or similar. For the other part of the question, I'm also interested in side effects like an HDFS disk write. If one task is writing to an HDFS path and another task starts up, wouldn't it also attempt to write to the same path? How is that de-conflicted? On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Mingyuan, According to my understanding, Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056) This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size this design effectively ensures that the actions are idempotent e.g. the count is implemented as def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum even the task in the partition is duplicately executed, the result put in the array is the same At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished Because, when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask - DAGScheduler.taskEnded in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check
Re: Need help on spark Hbase
Also, it helps if you post us logs, stacktraces, exceptions, etc. TD On Tue, Jul 15, 2014 at 10:07 AM, Jerry Lam chiling...@gmail.com wrote: Hi Rajesh, I have a feeling that this is not directly related to spark but I might be wrong. The reason why is that when you do: Configuration configuration = HBaseConfiguration.create(); by default, it reads the configuration files hbase-site.xml in your classpath and ... (I don't remember all the configuration files hbase has). I noticed that you overwrote some configuration settings in the code but I'm not if you have other configurations that might have conflicted with those. Could you try the following, remove anything that is spark specific leaving only hbase related codes. uber jar it and run it just like any other simple java program. If you still have connection issues, then at least you know the problem is from the configurations. HTH, Jerry On Tue, Jul 15, 2014 at 12:10 PM, Krishna Sankar ksanka...@gmail.com wrote: One vector to check is the HBase libraries in the --jars as in : spark-submit --class your class --master master url --jars hbase-client-0.98.3-hadoop2.jar,commons-csv-1.0-SNAPSHOT.jar,hbase-common-0.98.3-hadoop2.jar,hbase-hadoop2-compat-0.98.3-hadoop2.jar,hbase-it-0.98.3-hadoop2.jar,hbase-protocol-0.98.3-hadoop2.jar,hbase-server-0.98.3-hadoop2.jar,htrace-core-2.04.jar,spark-assembly-1.0.0-hadoop2.2.0.jar badwclient.jar This worked for us. Cheers k/ On Tue, Jul 15, 2014 at 6:47 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Could you please help me to resolve the issue. *Issue *: I'm not able to connect HBase from Spark-submit. Below is my code. When i execute below program in standalone, i'm able to connect to Hbase and doing the operation. When i execute below program using spark submit ( ./bin/spark-submit ) command, i'm not able to connect to hbase. Am i missing any thing? import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class Test { public static void main(String[] args) throws Exception { JavaStreamingContext ssc = new JavaStreamingContext(local,Test, new Duration(4), sparkHome, ); JavaDStreamString lines_2 = ssc.textFileStream(hdfsfolderpath); Configuration configuration = HBaseConfiguration.create(); configuration.set(hbase.zookeeper.property.clientPort, 2181); configuration.set(hbase.zookeeper.quorum, localhost); configuration.set(hbase.master, localhost:60); HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration); if (hBaseAdmin.tableExists(HABSE_TABLE)) { System.out.println( ANA_DATA table exists ..); } System.out.println( HELLO HELLO HELLO ); ssc.start(); ssc.awaitTermination(); } } Thank you for your help and support. Regards, Rajesh
Re: Can we get a spark context inside a mapper
Thanks a lot Sean, Daniel, Matei and Jerry. I really appreciate your reply. And I also understand that I should be a little more patient. When I myself is only not able to reply within next 5 hours how can I expect question to be answered in that time. And yes the Idea of using a separate Clustering library sounds correct. Although I am using python so I will be using Scikit learn istead of Weka. Thanks,, On Tue, Jul 15, 2014 at 12:51 AM, Jerry Lam chiling...@gmail.com wrote: Hi there, I think the question is interesting; a spark of sparks = spark I wonder if you can use the spark job server ( https://github.com/ooyala/spark-jobserver)? So in the spark task that requires a new spark context, instead of creating it in the task, contact the job server to create one and use the data in the task as the data source either via hdfs/tachyon/s3. Wait until the sub-task is done then continue. Since the job server has the notion of job id, you might use it as a reference to the sub-task. I don't know if this is a good idea or bad one. Maybe this is an anti-pattern of spark, but maybe not. HTH, Jerry On Mon, Jul 14, 2014 at 3:09 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You currently can't use SparkContext inside a Spark task, so in this case you'd have to call some kind of local K-means library. One example you can try to use is Weka (http://www.cs.waikato.ac.nz/ml/weka/). You can then load your text files as an RDD of strings with SparkContext.wholeTextFiles and call Weka on each one. Matei On Jul 14, 2014, at 11:30 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: I understand that the question is very unprofessional, but I am a newbie. If you could share some link where I can ask such questions, if not here. But please answer. On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hey, My question is for this situation: Suppose we have 10 files each containing list of features in each row. Task is that for each file cluster the features in that file and write the corresponding cluster along with it in a new file. So we have to generate 10 more files by applying clustering in each file individually. So can I do it this way, that get rdd of list of files and apply map. Inside the mapper function which will be handling each file, get another spark context and use Mllib kmeans to get the clustered output file. Please suggest the appropriate method to tackle this problem. Thanks, Rahul Kumar Bhojwani 3rd year, B.Tech Computer Science Engineering National Institute Of Technology, Karnataka 9945197359 -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Error while running Spark SQL join when using Spark 1.0.1
- user@incubator Hi Keith, I did reproduce this using local-cluster[2,2,1024], and the errors look almost the same. Just wondering, despite the errors did your program output any result for the join? On my machine, I could see the correct output. Zongheng On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust mich...@databricks.com wrote: Thanks for the extra info. At a quick glance the query plan looks fine to me. The class IntegerType does build a type tag I wonder if you are seeing the Scala issue manifest in some new way. We will attempt to reproduce locally. On Tue, Jul 15, 2014 at 1:41 PM, gorenuru goren...@gmail.com wrote: Just my few cents on this. I having the same problems with v 1.0.1 but this bug is sporadic and looks like is relayed to object initialization. Even more, i'm not using any SQL or something. I just have utility class like this: object DataTypeDescriptor { type DataType = String val BOOLEAN = BOOLEAN val STRING = STRING val TIMESTAMP = TIMESTAMP val LONG = LONG val INT = INT val SHORT = SHORT val BYTE = BYTE val DECIMAL = DECIMAL val DOUBLE = DOUBLE val FLOAT = FLOAT def $$(name: String, format: Option[String] = None) = DataTypeDescriptor(name, format) private lazy val nativeTypes: Map[String, NativeType] = Map( BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP - TimestampType, LONG - LongType, INT - IntegerType, SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType, DOUBLE - DoubleType, FLOAT - FloatType ) lazy val defaultValues: Map[String, Any] = Map( BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT - 0, SHORT - 0.toShort, BYTE - 0.toByte, DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f ) def apply(dataType: String): DataTypeDescriptor = { DataTypeDescriptor(dataType.toUpperCase, None) } def apply(dataType: SparkDataType): DataTypeDescriptor = { nativeTypes .find { case (_, descriptor) = descriptor == dataType } .map { case (name, descriptor) = DataTypeDescriptor(name, None) } .get } . and some test that check each of this methods. The problem is that this test fails randomly with this error. P.S.: I did not have this problem in Spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Repeated data item search with Spark SQL(1.0.1)
hql and sql are just two different dialects for interacting with data. After parsing is complete and the logical plan is constructed, the execution is exactly the same. On Tue, Jul 15, 2014 at 2:50 PM, Jerry Lam chiling...@gmail.com wrote: Hi Michael, I don't understand the difference between hql (HiveContext) and sql (SQLContext). My previous understanding was that hql is hive specific. Unless the table is managed by Hive, we should use sql. For instance, RDD (hdfsRDD) created from files in HDFS and registered as a table should use sql. However, my current understanding after trying your suggestion above is that I can also query the hdfsRDD using hql via LocalHiveContext. I just tested it, the lateral view explode(schools) works with the hdfsRDD. It seems to me that the HiveContext and SQLContext is the same except that HiveContext needs a metastore and it has a more powerful SQL support borrowed from Hive. Can you shed some lights on this when you get a minute? Thanks, Jerry On Tue, Jul 15, 2014 at 4:32 PM, Michael Armbrust mich...@databricks.com wrote: No, that is why I included the link to SPARK-2096 https://issues.apache.org/jira/browse/SPARK-2096 as well. You'll need to use HiveQL at this time. Is it possible or planed to support the schools.time format to filter the record that there is an element inside array of schools satisfy time 2? It would be great to support something like this, but its going to take a while to hammer out the correct semantics as SQL does not in general have great support for nested structures. I think different people might interpret that query to mean there is SOME school.time 2 vs. ALL school.time 2, etc. You can get what you want now using a lateral view: hql(SELECT DISTINCT name FROM people LATERAL VIEW explode(schools) s as school WHERE school.time 2)
Re: Multiple streams at the same time
Creating multiple StreamingContexts using the same SparkContext is currently not supported. :) Guess it was not clear in the docs. Note to self. TD On Tue, Jul 15, 2014 at 1:50 PM, gorenuru goren...@gmail.com wrote: Hi everyone. I have some problems running multiple streams at the same time. What i am doing is: object Test { import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.api.java.function._ import org.apache.spark.streaming._ import org.apache.spark.streaming.api._ def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setMaster(local[10]).setAppName(test)) val task1 = startListening(sc, Seconds(5)) val task2 = startListening(sc, Seconds(5)) val task3 = startListening(sc, Seconds(5)) val task4 = startListening(sc, Seconds(5)) Thread.sleep(1) println(Killing...) task1.stop() task2.stop() task3.stop() task4.stop() } private def startListening(sc: SparkContext, duration: Duration): StreamingTask = { val ssc = new StreamingContext(sc, duration) val lines = ssc.socketTextStream(localhost, ) lines.foreachRDD { rdd = println(rdd.collect().mkString()) } ssc.start() StreamingTask(ssc) } } case class StreamingTask(ssc: StreamingContext) { def stop() { ssc.stop(stopSparkContext = false, stopGracefully = false) } } The idea is that i am sharing the same SparkContext between different Streaming contexts. What i am getting is: Exception in thread main akka.actor.InvalidActorNameException: actor name [JobScheduler] is not unique! at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77) at akka.actor.ActorCell.reserveChild(ActorCell.scala:338) at akka.actor.dungeon.Children$class.makeChild(Children.scala:186) at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) at akka.actor.ActorCell.attachChild(ActorCell.scala:338) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:57) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:438) at io.ubix.spark.Test$.startListening(Test.scala:38) at io.ubix.spark.Test$.main(Test.scala:20) at io.ubix.spark.Test.main(Test.scala) And this is true because we are trying to create 4 JobScheduler's actors with the same name. From other hand - the question is how to deal with this kind of situations? Am i doing something wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Multiple streams at the same time
Oh, sad to hear that :( From my point of view, creating separate spark context for each stream is to expensive. Also, it's annoying because we have to be responsible for proper akka and UI port determination for each context. Do you know about any plans to support it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819p9842.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Multiple streams at the same time
Why do you need to create multiple streaming contexts at all? TD On Tue, Jul 15, 2014 at 3:43 PM, gorenuru goren...@gmail.com wrote: Oh, sad to hear that :( From my point of view, creating separate spark context for each stream is to expensive. Also, it's annoying because we have to be responsible for proper akka and UI port determination for each context. Do you know about any plans to support it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819p9842.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running Spark SQL join when using Spark 1.0.1
Hi Keith gorenuru, This patch (https://github.com/apache/spark/pull/1423) solves the errors for me in my local tests. If possible, can you guys test this out to see if it solves your test programs? Thanks, Zongheng On Tue, Jul 15, 2014 at 3:08 PM, Zongheng Yang zonghen...@gmail.com wrote: - user@incubator Hi Keith, I did reproduce this using local-cluster[2,2,1024], and the errors look almost the same. Just wondering, despite the errors did your program output any result for the join? On my machine, I could see the correct output. Zongheng On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust mich...@databricks.com wrote: Thanks for the extra info. At a quick glance the query plan looks fine to me. The class IntegerType does build a type tag I wonder if you are seeing the Scala issue manifest in some new way. We will attempt to reproduce locally. On Tue, Jul 15, 2014 at 1:41 PM, gorenuru goren...@gmail.com wrote: Just my few cents on this. I having the same problems with v 1.0.1 but this bug is sporadic and looks like is relayed to object initialization. Even more, i'm not using any SQL or something. I just have utility class like this: object DataTypeDescriptor { type DataType = String val BOOLEAN = BOOLEAN val STRING = STRING val TIMESTAMP = TIMESTAMP val LONG = LONG val INT = INT val SHORT = SHORT val BYTE = BYTE val DECIMAL = DECIMAL val DOUBLE = DOUBLE val FLOAT = FLOAT def $$(name: String, format: Option[String] = None) = DataTypeDescriptor(name, format) private lazy val nativeTypes: Map[String, NativeType] = Map( BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP - TimestampType, LONG - LongType, INT - IntegerType, SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType, DOUBLE - DoubleType, FLOAT - FloatType ) lazy val defaultValues: Map[String, Any] = Map( BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT - 0, SHORT - 0.toShort, BYTE - 0.toByte, DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f ) def apply(dataType: String): DataTypeDescriptor = { DataTypeDescriptor(dataType.toUpperCase, None) } def apply(dataType: SparkDataType): DataTypeDescriptor = { nativeTypes .find { case (_, descriptor) = descriptor == dataType } .map { case (name, descriptor) = DataTypeDescriptor(name, None) } .get } . and some test that check each of this methods. The problem is that this test fails randomly with this error. P.S.: I did not have this problem in Spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Client application that calls Spark and receives an MLlib *model* Scala Object, not just result
Thanks Soumya - I guess the next step from here is to move the MLlib model from the Spark application with simply does the training, and giving to the client application which simply does the predictions. I will try the Kryo library to physically serialize the object and trade it across machines / applications. Rather than writing it to file, I will send it over the network - any thoughts on that? Thanks! On Mon, Jul 14, 2014 at 1:43 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Please look at the following. https://github.com/ooyala/spark-jobserver http://en.wikipedia.org/wiki/Predictive_Model_Markup_Language https://github.com/EsotericSoftware/kryo You can train your model convert it to PMML and return that to your client OR You can train your model and write that model (serialized object) to the file system (local, HDFS, S3 etc) or a datastore and return a location back to the client on a successful write. On Mon, Jul 14, 2014 at 4:27 PM, Aris Vlasakakis a...@vlasakakis.com wrote: Hello Spark community, I would like to write an application in Scala that i a model server. It should have an MLlib Linear Regression model that is already trained on some big set of data, and then is able to repeatedly call myLinearRegressionModel.predict() many times and return the result. Now, I want this client application to submit a job to Spark and tell the Spark cluster job to 1) train its particular MLlib model, which produces a LinearRegression model, and then 2) take the produced Scala org.apache.spark.mllib.regression.LinearRegressionModel *object*, serialize that object, and return this serialized object over the wire to my calling application. 3) My client application receives the serialized Scala (model) object, and can call .predict() on it over and over. I am separating the heavy lifting of training the model and doing model predictions; the client application will only do predictions using the MLlib model it received from the Spark application. The confusion I have is that I only know how to submit jobs to Spark by using the bin/spark-submit script, and then the only output I receive is stdout (as in, text). I want my scala appliction to hopefully submit the spark model-training programmatically, and for the Spark application to return a SERIALIZED MLLIB OBJECT, not just some stdout text! How can I do this? I think my use case of separating long-running jobs to Spark and using it's libraries in another application should be a pretty common design pattern. Thanks! -- Άρης Βλασακάκης Aris Vlasakakis
Re: Submitting to a cluster behind a VPN, configuring different IP address
Hello! Just curious if anybody could respond to my original message, if anybody knows about how to set the configuration variables that are handles by Jetty and not Spark's native framework..which is Akka I think? Thanks On Thu, Jul 10, 2014 at 4:04 PM, Aris Vlasakakis a...@vlasakakis.com wrote: Hi Spark folks, So on our production Spark cluster, it lives in the data center and I need to attach to a VPN from my laptop, so that I can then submit a Spark application job to the Spark Master (behind the VPN). However, the problem arises that I have a local IP address on the laptop which is on a separate network segment than the VPN. I figured out that I can set the SparkConf configuration property called spark.driver.host to be my VPN IP address - this got me further, because I could initiate a connection to the master. However, it was still not working. I was hoping to set the SparkConf variables spark.fileserver.uri and spark.httpBroadcast.uri to NOT be my local IP addresses, but the VPN IP addresses, because I think these configuration variables actually pass my application files to the master (in this case, it is a python script). So my spark.fileserver.uri and the Spark Master need to be on the same network segment (the VPN subnetwork ). Am I on the right track? How can I set spark.fileserver.uri and spark.httpBroadcast.uri ? I see that these are actually run by Jetty server...any thoughts? Thank you so much! -- Άρης Βλασακάκης Aris Vlasakakis -- Άρης Βλασακάκης Aris Vlasakakis
Re: SQL + streaming
Hi Tathagata, I could see the output of count, but no sql results. Run in standalone is meaningless for me and I just run in my local single node yarn cluster. Thanks On Tue, Jul 15, 2014 at 12:48 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Could you run it locally first to make sure it works, and you see output? Also, I recommend going through the previous step-by-step approach to narrow down where the problem is. TD On Mon, Jul 14, 2014 at 9:15 PM, hsy...@gmail.com hsy...@gmail.com wrote: Actually, I deployed this on yarn cluster(spark-submit) and I couldn't find any output from the yarn stdout logs On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you make sure you are running locally on more than 1 local cores? You could set the master in the SparkConf as conf.setMaster(local[4]). Then see if there are jobs running on every batch of data in the Spark web ui (running on localhost:4040). If you still dont get any output, try first simple printing recRDD.count() in the foreachRDD (that is, first test spark streaming). If you can get that to work, then I would test the Spark SQL stuff. TD On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com hsy...@gmail.com wrote: No errors but no output either... Thanks! On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Could you elaborate on what is the problem you are facing? Compiler error? Runtime error? Class-not-found error? Not receiving any data from Kafka? Receiving data but SQL command throwing error? No errors but no output either? TD On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much! object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length 4) { System.err.println(Usage: KafkaSpark zkQuorum group topics numThreads) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); //ssc.checkpoint(checkpoint) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(1) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t = getRecord(t._2.split(#))) val result = recordsStream.foreachRDD((recRDD, tt)={ recRDD.registerAsTable(records) val result = sql(select * from records) println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println(Getting the record) Record(l(0), l(1))} }
Re: import org.apache.spark.streaming.twitter._ in Shell
Yes, what Nick said is the recommended way. In most usecases, a spark streaming program in production is not usually run from the shell. Hence, we chose not to make the external stuff (twitter, kafka, etc.) available to spark shell to avoid dependency conflicts brought it by them with spark's dependencies. That said, you could tweak things at your own risk as Praveen suggested. Specifically for twitter, its quite fine I think, as the dependencies of the twitter4j library is pretty thin, and do not conflict with spark's dependencies (At least the current version). TD On Tue, Jul 15, 2014 at 1:03 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You could try the following: create a minimal project using sbt or Maven, add spark-streaming-twitter as a dependency, run sbt assembly (or mvn package) on that to create a fat jar (with Spark as provided dependency), and add that to the shell classpath when starting up. On Tue, Jul 15, 2014 at 9:06 AM, Praveen Seluka psel...@qubole.com wrote: If you want to make Twitter* classes available in your shell, I believe you could do the following 1. Change the parent pom module ordering - Move external/twitter before assembly 2. In assembly/pom.xm, add external/twitter dependency - this will package twitter* into the assembly jar Now when spark-shell is launched, assembly jar is in classpath - hence twitter* too. I think this will work (remember trying this sometime back) On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I'd like to clarify something from your comments, Tathagata. Going forward, is Twitter Streaming functionality not supported from the shell? What should users do if they'd like to process live Tweets from the shell? Nick On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? Yep. If yes, then what change in Spark caused it to not work any more? It still works for me. I was just commenting on your remark that it doesn't work through the shell, which I now understand to apply to versions of Spark before 1.0.0. Nick
Re: Multiple streams at the same time
Because I want to have different streams with different durations. Fornexample, one triggers snapshot analysis each 5 minutes and another each 10 seconds On Tue, Jul 15, 2014 at 3:59 pm, Tathagata Das [via Apache Spark User List] lt;ml-node+s1001560n984...@n3.nabble.comgt; wrote: Why do you need to create multiple streaming contexts at all? TD On Tue, Jul 15, 2014 at 3:43 PM, gorenuru lt; [hidden email] gt; wrote: Oh, sad to hear that :( From my point of view, creating separate spark context for each stream is to expensive. Also, it's annoying because we have to be responsible for proper akka and UI port determination for each context. Do you know about any plans to support it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819p9842.html Sent from the Apache Spark User List mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819p9843.html To unsubscribe from Multiple streams at the same time, click here . NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819p9852.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while running Spark SQL join when using Spark 1.0.1
Cool. So Michael's hunch was correct, it is a thread issue. I'm currently using a tarball build, but I'll do a spark build with the patch as soon as I have a chance and test it out. Keith On Tue, Jul 15, 2014 at 4:14 PM, Zongheng Yang zonghen...@gmail.com wrote: Hi Keith gorenuru, This patch (https://github.com/apache/spark/pull/1423) solves the errors for me in my local tests. If possible, can you guys test this out to see if it solves your test programs? Thanks, Zongheng On Tue, Jul 15, 2014 at 3:08 PM, Zongheng Yang zonghen...@gmail.com wrote: - user@incubator Hi Keith, I did reproduce this using local-cluster[2,2,1024], and the errors look almost the same. Just wondering, despite the errors did your program output any result for the join? On my machine, I could see the correct output. Zongheng On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust mich...@databricks.com wrote: Thanks for the extra info. At a quick glance the query plan looks fine to me. The class IntegerType does build a type tag I wonder if you are seeing the Scala issue manifest in some new way. We will attempt to reproduce locally. On Tue, Jul 15, 2014 at 1:41 PM, gorenuru goren...@gmail.com wrote: Just my few cents on this. I having the same problems with v 1.0.1 but this bug is sporadic and looks like is relayed to object initialization. Even more, i'm not using any SQL or something. I just have utility class like this: object DataTypeDescriptor { type DataType = String val BOOLEAN = BOOLEAN val STRING = STRING val TIMESTAMP = TIMESTAMP val LONG = LONG val INT = INT val SHORT = SHORT val BYTE = BYTE val DECIMAL = DECIMAL val DOUBLE = DOUBLE val FLOAT = FLOAT def $$(name: String, format: Option[String] = None) = DataTypeDescriptor(name, format) private lazy val nativeTypes: Map[String, NativeType] = Map( BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP - TimestampType, LONG - LongType, INT - IntegerType, SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType, DOUBLE - DoubleType, FLOAT - FloatType ) lazy val defaultValues: Map[String, Any] = Map( BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT - 0, SHORT - 0.toShort, BYTE - 0.toByte, DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f ) def apply(dataType: String): DataTypeDescriptor = { DataTypeDescriptor(dataType.toUpperCase, None) } def apply(dataType: SparkDataType): DataTypeDescriptor = { nativeTypes .find { case (_, descriptor) = descriptor == dataType } .map { case (name, descriptor) = DataTypeDescriptor(name, None) } .get } . and some test that check each of this methods. The problem is that this test fails randomly with this error. P.S.: I did not have this problem in Spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.