Re: Using Spark as web app backend
Hi all, Thank you for the reply. Is there any example of spark running in client mode with spray ? I think, I will choose this approach. On Tue, Jun 24, 2014 at 4:55 PM, Koert Kuipers ko...@tresata.com wrote: run your spark app in client mode together with a spray rest service, that the front end can talk to On Tue, Jun 24, 2014 at 3:12 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, So far, I run my spark jobs with spark-shell or spark-submit command. I'd like to go further and I wonder how to use spark as a backend of a web application. Specificaly, I want a frontend application ( build with nodejs ) to communicate with spark on the backend, so that every query from the frontend is rooted to spark. And the result from Spark are sent back to the frontend. Does some of you already experiment this kind of architecture ? Cheers, Jaonary
Need help to make spark sql works in stand alone application
Hi all, I'm trying to use spark sql to store data in parquet file. I create the file and insert data into it with the following code : *val conf = new SparkConf().setAppName(MCT).setMaster(local[2]) val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc) import sqlContext._val personParquet = createParquetFile[Person](people_1.pqt) personParquet.registerAsTable(people)val data = sc.parallelize(Seq(Person(Toto, 10), Person(foo, 101))) data.insertInto(people)personParquet.collect foreach(println) data.insertInto(people)val personParquet2 = parquetFile(people_1.pqt)personParquet2.collect foreach(println)* It works as I expect when I run it in spark-shell. But with a stand alone application, I get a build error : *MCT.scala:18: not found: value createParquetFile* If I skip this creation set and save the rdd as parquet file directly it works. But then, when I insert new data nothing happen. What I'm doing wrong ? Best regards, Jaonary
Re: how to make saveAsTextFile NOT split output into multiple file?
rdd.coalesce() will take effect: rdd.coalesce(1, true).saveAsTextFile(save_path) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-saveAsTextFile-NOT-split-output-into-multiple-file-tp8129p8244.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using Spark as web app backend
Yeah I agree with Koert, it would be the lightest solution. I have used it quite successfully and it just works. There is not much spark specifics here, you can follow this example https://github.com/jacobus/s4 on how to build your spray service. Then the easy solution would be to have a SparkContext in your HttpService, this context is being initialized at bootstrap, computes the RDD you want to run the queries on and caches them. In your routes, you will query the cached RDDs. In my case I used spark+spray a bit differently for a always running service, as I didn't want to block the resources for always. The app at bootstrap was starting a spark job that fetches data and preprocesses/precomputes an optimized structure (domain specific indexes) that is collected locally and then reused across requests directly from RAM, the spark context is stopped when the job is done. Only the service continues to run. Eugen 2014-06-25 9:07 GMT+02:00 Jaonary Rabarisoa jaon...@gmail.com: Hi all, Thank you for the reply. Is there any example of spark running in client mode with spray ? I think, I will choose this approach. On Tue, Jun 24, 2014 at 4:55 PM, Koert Kuipers ko...@tresata.com wrote: run your spark app in client mode together with a spray rest service, that the front end can talk to On Tue, Jun 24, 2014 at 3:12 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, So far, I run my spark jobs with spark-shell or spark-submit command. I'd like to go further and I wonder how to use spark as a backend of a web application. Specificaly, I want a frontend application ( build with nodejs ) to communicate with spark on the backend, so that every query from the frontend is rooted to spark. And the result from Spark are sent back to the frontend. Does some of you already experiment this kind of architecture ? Cheers, Jaonary
Re: Spark slave fail to start with wierd error information
Sorry I just realize that start-slave is for a different task. Please close this -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203p8246.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
I'm running a very small job (16 partitions, 2 stages) on a 2-node cluster, each with 15G memory, the master page looks all normal: URL: spark://ec2-54-88-40-125.compute-1.amazonaws.com:7077 Workers: 1 Cores: 2 Total, 2 Used Memory: 13.9 GB Total, 512.0 MB Used Applications: 1 Running, 0 Completed Drivers: 0 Running, 1 Completed Status: ALIVE Workers Id Address State Cores Memory worker-20140625083124-ip-172-31-35-57.ec2.internal-54548 ip-172-31-35-57.ec2.internal:54548 ALIVE 2 (2 Used) 13.9 GB (512.0 MB Used) Running Applications ID NameCores Memory per Node Submitted Time UserState Duration app-20140625083158- org.tribbloid.spookystuff.example.GoogleImage$ 2 512.0 MB2014/06/25 08:31:58 pengRUNNING 17 min However when submitting the job in client mode: $SPARK_HOME/bin/spark-submit \ --class org.tribbloid.spookystuff.example.GoogleImage \ --master spark://ec2-54-88-40-125.compute-1.amazonaws.com:7077 \ --deploy-mode client \ ./../../../target/spookystuff-example-assembly-0.1.0-SNAPSHOT.jar \ it is never picked up by any worker despite that 13.4G memory and 2 cores in total are available. The log of driver shows repeatedly: 14/06/25 04:46:29 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Looks like its either a bug or misinformation. Can someone confirm this so I can submit a JIRA? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using Spark as web app backend
Totally agree, also there is a class 'SparkSubmit' you can call directly to replace shellscript -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-as-web-app-backend-tp8163p8248.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Prediction using Classification with text attributes in Apache Spark MLLib
Hi Imk, I am not aware of any classifier in MLLib that accept nominal type of data. They do accept RDD of LabeledPoints, which are label + vector of Double. So, you'll need to convert nominal to double. Best regards, Alexander -Original Message- From: lmk [mailto:lakshmi.muralikrish...@gmail.com] Sent: Wednesday, June 25, 2014 1:27 PM To: u...@spark.incubator.apache.org Subject: RE: Prediction using Classification with text attributes in Apache Spark MLLib Hi Alexander, Just one more question on a related note. Should I be following the same procedure even if my data is nominal (categorical), but having a lot of combinations? (In Weka I used to have it as nominal data) Regards, -lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Prediction-using-Classification-with-text-attributes-in-Apache-Spark-MLLib-tp8166p8249.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Cassandra and Spark checkpoints
According to „DataStax Brings Spark To Cassandra“ press realese: „DataStax has partnered with Databricks, the company founded by the creators of Apache Spark, to build a supported, open source integration between the two platforms. The partners expect to have the integration ready by this summer.“ How far this integration goes? Fow example is it possible to use Cassandra as distributed checkpoints storage? Currently only HDFS is supported? Thanks Toivo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-and-Spark-checkpoints-tp8254.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Upgrading to Spark 1.0.0 causes NoSuchMethodError
Hi, Robert -- I wonder if this is an instance of SPARK-2075: https://issues.apache.org/jira/browse/SPARK-2075 -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Wed, Jun 25, 2014 at 6:28 AM, Robert James srobertja...@gmail.com wrote: On 6/24/14, Robert James srobertja...@gmail.com wrote: My app works fine under Spark 0.9. I just tried upgrading to Spark 1.0, by downloading the Spark distro to a dir, changing the sbt file, and running sbt assembly, but I get now NoSuchMethodErrors when trying to use spark-submit. I copied in the SimpleApp example from http://spark.apache.org/docs/latest/quick-start.html and get the same error: $/usr/local/share/spark/bin/spark-submit --class SimpleApp target/scala-2.10/myproj-assembly-1.0.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.SparkContext$.$lessinit$greater$default$2()Lscala/collection/Map; at SimpleApp$.main(SimpleApp.scala:10) at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) How can I migrate to Spark 1.0.0? I've done `sbt clean`, deleted the entire ivy2 cache, and still get the above error on both my code and the official Spark example. Can anyone guide me on how to debug this? How does Spark find the /usr/local/share/spark directory? Is there a variable somewhere I need to set to point to that, or that might point to the old spark? I've left the old spark dir on the machine (just changed the symlink) - can that be causing problems? How should I approach this?
Spark and Cassandra - NotSerializableException
Hi, I am writing a standalone Spark program that gets its data from Cassandra. I followed the examples and created the RDD via the newAPIHadoopRDD() and the ColumnFamilyInputFormat class. The RDD is created, but I get a NotSerializableException when I call the RDD's .groupByKey() method: public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(local).setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Job job = new Job(); Configuration jobConf = job.getConfiguration(); job.setInputFormatClass(ColumnFamilyInputFormat.class); ConfigHelper.setInputInitialAddress(jobConf, host); ConfigHelper.setInputRpcPort(jobConf, port); ConfigHelper.setOutputInitialAddress(jobConf, host); ConfigHelper.setOutputRpcPort(jobConf, port); ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true); ConfigHelper.setInputPartitioner(jobConf,Murmur3Partitioner); ConfigHelper.setOutputPartitioner(jobConf,Murmur3Partitioner); SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setFinish(new byte[0]); sliceRange.setStart(new byte[0]); predicate.setSlice_range(sliceRange); ConfigHelper.setInputSlicePredicate(jobConf, predicate); JavaPairRDDByteBuffer, SortedMaplt;ByteBuffer, IColumn rdd = spark.newAPIHadoopRDD(jobConf, ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), ByteBuffer.class, SortedMap.class); JavaPairRDDByteBuffer, Iterablelt;SortedMaplt;ByteBuffer, IColumn groupRdd = rdd.groupByKey(); System.out.println(groupRdd.count()); } The exception: java.io.NotSerializableException: java.nio.HeapByteBuffer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) What I am trying to do is to merge all row key columns into a single entry. I also get the same exception when I try to use the reduceByKey() method like so: JavaPairRDDByteBuffer, SortedMaplt;ByteBuffer, IColumn reducedRdd = rdd.reduceByKey( new Function2SortedMaplt;ByteBuffer, IColumn, SortedMapByteBuffer, IColumn, SortedMapByteBuffer, IColumn() { public SortedMapByteBuffer, IColumn call(SortedMapByteBuffer, IColumn arg0, SortedMapByteBuffer, IColumn arg1) throws Exception { SortedMapByteBuffer, IColumn sortedMap = new TreeMapByteBuffer, IColumn(arg0.comparator()); sortedMap.putAll(arg0); sortedMap.putAll(arg1); return sortedMap; } } ); I am using: 1. spark-1.0.0-bin-hadoop1 2. Cassandra 1.2.12 3. Java 1.6 Do anyone know what the problem is? What is there that fails serialization? Thanks, Shai -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Cassandra-NotSerializableException-tp8260.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: partitions, coalesce() and parallelism
The behavior you're seeing is by design, and it is VERY IMPORTANT to understand why this happens because it can cause unexpected behavior in various ways. I learned that the hard way. :-) Spark collapses multiple transforms into a single stage wherever possible (presumably for performance). The boundary between stages is a shuffle. In your example there's no shuffle, so all transforms are being collapsed into a single stage. Since you coalesce at the end into two partitions, and there is only one stage, that stage must contain two tasks. It is important to note that coalesce will not cause a shuffle by default (repartition will always cause a shuffle). However, you can force it to partition by passing true as a second (optional) parameter, like so: val rdd4 = rdd3.coalesce(2, true) Try this in Spark shell and you should see 100 tasks for the first stage and 2 tasks for the second. On Tue, Jun 24, 2014 at 9:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Ah, here's a better hypothesis. Everything you are doing minus the save() is a transformation, not an action. Since nothing is actually triggered until the save(), Spark may be seeing that the lineage of operations ends with 2 partitions anyway and simplifies accordingly. Two suggestions you can try: 1. Remove the coalesce(2) and concatenate the files post-processing to get the number of files you want. This will also ensure the save() operation can be parallelized fully. I think this is the preferable approach since it does not artificially reduce the parallelism of your job at any stage. 2. Another thing you can try is the following: val rdd1 = sc.sequenceFile(...) val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2.map(...).cache() // cache this RDD val some_count = rdd3.count() // force the map() to run and materialize the result val rdd4 = rdd3.coalesce(2) val rdd5 = rdd4.saveAsTextFile(...) // want only two output files rdd3.unpersist() This should let the map() run 100 tasks in parallel while giving you only 2 output files. You'll get this at the cost of serializing rdd3 to memory by running the count(). Nick On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert alex.boisv...@gmail.com wrote: For the skeptics :), here's a version you can easily reproduce at home: val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2 map { _ + 1000 } val rdd4 = rdd3.coalesce(2) rdd4.collect() You can see that everything runs as only 2 tasks ... :-/ 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job: collect at console:48 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got job 0 (collect at console:48) with 2 output partitions (allowLocal=false) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Final stage: Stage 0 (collect at console:48) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Parents of final stage: List() 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler: Missing parents: List() 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler: Submitting Stage 0 (CoalescedRDD[11] at coalesce at console:45), which has no missing parents 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at console:45) 2014-06-25 00:43:20,901 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:0 as 6632 bytes in 16 ms 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:1 as 6632 bytes in 8 ms 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2) 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2) 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler: Completed ResultTask(0, 0) 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler: Completed ResultTask(0, 1) 2014-06-25 00:43:21,608 INFO org.apache.spark.scheduler.DAGScheduler: Stage 0 (collect at console:48) finished in 0.693 s 2014-06-25 00:43:21,616 INFO org.apache.spark.SparkContext: Job finished:
Spark's Hadooop Dependency
To add Spark to a SBT project, I do: libraryDependencies += org.apache.spark %% spark-core % 1.0.0 % provided How do I make sure that the spark version which will be downloaded will depend on, and use, Hadoop 2, and not Hadoop 1? Even with a line: libraryDependencies += org.apache.hadoop % hadoop-client % 2.4.0 I still see SBT downloading Hadoop 1: [debug] == resolving dependencies org.apache.spark#spark-core_2.10;1.0.0-org.apache.hadoop#hadoop-client;1.0.4 [compile-master(*)] [debug] dependency descriptor has been mediated: dependency: org.apache.hadoop#hadoop-client;2.4.0 {compile=[default(compile)]} = dependency: org.apache.hadoop#hadoop-client;1.0.4 {compile=[default(compile)]}
Re: Spark's Hadooop Dependency
libraryDependencies ++= Seq( org.apache.spark %% spark-core % versionSpark % provided exclude(org.apache.hadoop, hadoop-client) org.apache.hadoop % hadoop-client % versionHadoop % provided ) On Wed, Jun 25, 2014 at 11:26 AM, Robert James srobertja...@gmail.com wrote: To add Spark to a SBT project, I do: libraryDependencies += org.apache.spark %% spark-core % 1.0.0 % provided How do I make sure that the spark version which will be downloaded will depend on, and use, Hadoop 2, and not Hadoop 1? Even with a line: libraryDependencies += org.apache.hadoop % hadoop-client % 2.4.0 I still see SBT downloading Hadoop 1: [debug] == resolving dependencies org.apache.spark#spark-core_2.10;1.0.0-org.apache.hadoop#hadoop-client;1.0.4 [compile-master(*)] [debug] dependency descriptor has been mediated: dependency: org.apache.hadoop#hadoop-client;2.4.0 {compile=[default(compile)]} = dependency: org.apache.hadoop#hadoop-client;1.0.4 {compile=[default(compile)]}
Re: Powered by Spark addition
Hi Matei, Sailthru is also using Spark. Could you please add us to the Powered By Spark https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark page when you have a chance? Organization Name: Sailthru URL: www.sailthru.com Short Description: Our data science platform uses Spark to build predictive models and recommendation systems for marketing automation and personalization Thank you, Alex On Sun, Jun 22, 2014 at 1:37 AM, Sonal Goyal sonalgoy...@gmail.com wrote: Thanks a lot Matei. Sent from my iPad On Jun 22, 2014, at 5:20 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Alright, added you — sorry for the delay. Matei On Jun 12, 2014, at 10:29 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi, Can we get added too? Here are the details: Name: Nube Technologies URL: www.nubetech.co Description: Nube provides solutions for data curation at scale helping customer targetting, accurate inventory and efficient analysis. Thanks! Best Regards, Sonal Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Thu, Jun 12, 2014 at 11:33 PM, Derek Mansen de...@vistarmedia.com wrote: Awesome, thank you! On Wed, Jun 11, 2014 at 6:53 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Alright, added you. Matei On Jun 11, 2014, at 1:28 PM, Derek Mansen de...@vistarmedia.com wrote: Hello, I was wondering if we could add our organization to the Powered by Spark page. The information is: Name: Vistar Media URL: www.vistarmedia.com Description: Location technology company enabling brands to reach on-the-go consumers. Let me know if you need anything else. Thanks! Derek Mansen
graphx Joining two VertexPartitions with different indexes is slow.
lately i am seeing a lot of this warning in graphx: org.apache.spark.graphx.impl.ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow. i am using Graph.outerJoinVertices to join in data from a regular RDD (that is co-partitioned). i would like this operation to be fast, since i use it frequently. should i be doing something different?
Re: balancing RDDs
Yep exactly! I’m not sure how complicated it would be to pull off. If someone wouldn’t mind helping to get me pointed in the right direction I would be happy to look into and contribute this functionality. I imagine this would be implemented in the scheduler codebase and there would be some sort of rebalance configuration property to enable it possibly? Does anyone else have any thoughts on this? Cheers, Sean On Jun 24, 2014, at 4:41 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: This would be really useful. Especially for Shark where shift of partitioning effects all subsequent queries unless task scheduling time beats spark.locality.wait. Can cause overall low performance for all subsequent tasks. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: We have a use case where we’d like something to execute once on each node and I thought it would be good to ask here. Currently we achieve this by setting the parallelism to the number of nodes and use a mod partitioner: val balancedRdd = sc.parallelize( (0 until Settings.parallelism) .map(id = id - Settings.settings) ).partitionBy(new ModPartitioner(Settings.parallelism)) .cache() This works great except in two instances where it can become unbalanced: 1. if a worker is restarted or dies, the partition will move to a different node (one of the nodes will run two tasks). When the worker rejoins, is there a way to have a partition move back over to the newly restarted worker so that it’s balanced again? 2. drivers need to be started in a staggered fashion, otherwise one driver can launch two tasks on one set of workers, and the other driver will do the same with the other set. Are there any scheduler/config semantics so that each driver will take one (and only one) core from *each* node? Thanks Sean
Re: partitions, coalesce() and parallelism
Thanks Daniel and Nicholas for the helpful responses. I'll go with coalesce(shuffle = true) and see how things go. On Wed, Jun 25, 2014 at 8:19 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: The behavior you're seeing is by design, and it is VERY IMPORTANT to understand why this happens because it can cause unexpected behavior in various ways. I learned that the hard way. :-) Spark collapses multiple transforms into a single stage wherever possible (presumably for performance). The boundary between stages is a shuffle. In your example there's no shuffle, so all transforms are being collapsed into a single stage. Since you coalesce at the end into two partitions, and there is only one stage, that stage must contain two tasks. It is important to note that coalesce will not cause a shuffle by default (repartition will always cause a shuffle). However, you can force it to partition by passing true as a second (optional) parameter, like so: val rdd4 = rdd3.coalesce(2, true) Try this in Spark shell and you should see 100 tasks for the first stage and 2 tasks for the second. On Tue, Jun 24, 2014 at 9:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Ah, here's a better hypothesis. Everything you are doing minus the save() is a transformation, not an action. Since nothing is actually triggered until the save(), Spark may be seeing that the lineage of operations ends with 2 partitions anyway and simplifies accordingly. Two suggestions you can try: 1. Remove the coalesce(2) and concatenate the files post-processing to get the number of files you want. This will also ensure the save() operation can be parallelized fully. I think this is the preferable approach since it does not artificially reduce the parallelism of your job at any stage. 2. Another thing you can try is the following: val rdd1 = sc.sequenceFile(...) val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2.map(...).cache() // cache this RDD val some_count = rdd3.count() // force the map() to run and materialize the result val rdd4 = rdd3.coalesce(2) val rdd5 = rdd4.saveAsTextFile(...) // want only two output files rdd3.unpersist() This should let the map() run 100 tasks in parallel while giving you only 2 output files. You'll get this at the cost of serializing rdd3 to memory by running the count(). Nick On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert alex.boisv...@gmail.com wrote: For the skeptics :), here's a version you can easily reproduce at home: val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2 map { _ + 1000 } val rdd4 = rdd3.coalesce(2) rdd4.collect() You can see that everything runs as only 2 tasks ... :-/ 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job: collect at console:48 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got job 0 (collect at console:48) with 2 output partitions (allowLocal=false) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Final stage: Stage 0 (collect at console:48) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Parents of final stage: List() 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler: Missing parents: List() 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler: Submitting Stage 0 (CoalescedRDD[11] at coalesce at console:45), which has no missing parents 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at console:45) 2014-06-25 00:43:20,901 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:0 as 6632 bytes in 16 ms 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:1 as 6632 bytes in 8 ms 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2) 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2) 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler: Completed ResultTask(0, 0) 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler:
jsonFile function in SQLContext does not work
I'm using Spark 1.0.0-SNAPSHOT (downloaded and compiled on 2014/06/23). I'm trying to execute the following code: import org.apache.spark.SparkContext._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val table = sqlContext.jsonFile(hdfs://host:9100/user/myuser/data.json) table.printSchema() data.json looks like this (3 shortened lines shown here): {field1:content,id:12312213,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} {field1:content,id:56756765,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} {field1:content,id:56765765,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} The JSON-Object in each line is valid according to the JSON-Validator I use, and as jsonFile is defined as def jsonFile(path: String): SchemaRDD Loads a JSON file (one object per line), returning the result as a SchemaRDD. I would assume this should work. However, executing this code return this error: 14/06/25 10:05:09 WARN scheduler.TaskSetManager: Lost TID 11 (task 0.0:11) 14/06/25 10:05:09 WARN scheduler.TaskSetManager: Loss was due to com.fasterxml.jackson.databind.JsonMappingException com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@238df2e4; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) ... Does anyone know where the problem lies? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark's Maven dependency on Hadoop 1
According to http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.0.0 , spark depends on Hadoop 1.0.4. What about the versions of Spark that work with Hadoop 2? Do they also depend on Hadoop 1.0.4? How does everyone handle this?
Re: jsonFile function in SQLContext does not work
Hi durin, I just tried this example (nice data, by the way!), *with each JSON object on one line*, and it worked fine: scala rdd.printSchema() root |-- entities: org.apache.spark.sql.catalyst.types.StructType$@13b6cdef ||-- friends: ArrayType[org.apache.spark.sql.catalyst.types.StructType$@13b6cdef] |||-- id: IntegerType |||-- indices: ArrayType[IntegerType] |||-- name: StringType ||-- weapons: ArrayType[StringType] |-- field1: StringType |-- id: IntegerType |-- lang: StringType |-- place: StringType |-- read: BooleanType |-- user: org.apache.spark.sql.catalyst.types.StructType$@13b6cdef ||-- id: IntegerType ||-- name: StringType ||-- num_heads: IntegerType On Wed, Jun 25, 2014 at 10:57 AM, durin m...@simon-schaefer.net wrote: I'm using Spark 1.0.0-SNAPSHOT (downloaded and compiled on 2014/06/23). I'm trying to execute the following code: import org.apache.spark.SparkContext._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val table = sqlContext.jsonFile(hdfs://host:9100/user/myuser/data.json) table.printSchema() data.json looks like this (3 shortened lines shown here): {field1:content,id:12312213,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} {field1:content,id:56756765,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} {field1:content,id:56765765,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} The JSON-Object in each line is valid according to the JSON-Validator I use, and as jsonFile is defined as def jsonFile(path: String): SchemaRDD Loads a JSON file (one object per line), returning the result as a SchemaRDD. I would assume this should work. However, executing this code return this error: 14/06/25 10:05:09 WARN scheduler.TaskSetManager: Lost TID 11 (task 0.0:11) 14/06/25 10:05:09 WARN scheduler.TaskSetManager: Loss was due to com.fasterxml.jackson.databind.JsonMappingException com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@238df2e4; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) ... Does anyone know where the problem lies? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.0 on yarn cluster problem
Hi Sophia, did you ever resolve this? A common cause for not giving resources to the job is that the RM cannot communicate with the workers. This itself has many possible causes. Do you have a full stack trace from the logs? Andrew 2014-06-13 0:46 GMT-07:00 Sophia sln-1...@163.com: With the yarn-client mode,I submit a job from client to yarn,and the spark file spark-env.sh: export HADOOP_HOME=/usr/lib/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop SPARK_EXECUTOR_INSTANCES=4 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=1G SPARK_DRIVER_MEMORY=2G SPARK_YARN_APP_NAME=Spark 1.0.0 the command line and the result: $export JAVA_HOME=/usr/java/jdk1.7.0_45/ $export PATH=$JAVA_HOME/bin:$PATH $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client ./bin/spark-submit: line 44: /usr/lib/spark/bin/spark-class: Success How can I do with it? The yarn only accept the job but it cannot give memory to the job.Why? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-on-yarn-cluster-problem-tp7560.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: jsonFile function in SQLContext does not work
Hi Zongheng Yang, thanks for your response. Reading your answer, I did some more tests and realized that analyzing very small parts of the dataset (which is ~130GB in ~4.3M lines) works fine. The error occurs when I analyze larger parts. Using 5% of the whole data, the error is the same as posted before for certain TIDs. However, I get the structure determined so far as a result when using 5%. The Spark WebUI shows the following: Job aborted due to stage failure: Task 6.0:11 failed 4 times, most recent failure: Exception failure in TID 108 on host foo.bar.com: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@3697781f; line: 1, column: 1] com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3029) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:823) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:821) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: Is the only possible reason that some of these 4.3 Million JSON-Objects are not valid JSON, or could there be another explanation? And if it is the reason, is there some way to tell the function to just skip faulty lines? Thanks, Durin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8278.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: jsonFile function in SQLContext does not work
Is it possible you have blank lines in your input? Not that this should be an error condition, but it may be what's causing it. On Wed, Jun 25, 2014 at 11:57 AM, durin m...@simon-schaefer.net wrote: Hi Zongheng Yang, thanks for your response. Reading your answer, I did some more tests and realized that analyzing very small parts of the dataset (which is ~130GB in ~4.3M lines) works fine. The error occurs when I analyze larger parts. Using 5% of the whole data, the error is the same as posted before for certain TIDs. However, I get the structure determined so far as a result when using 5%. The Spark WebUI shows the following: Job aborted due to stage failure: Task 6.0:11 failed 4 times, most recent failure: Exception failure in TID 108 on host foo.bar.com: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@3697781f; line: 1, column: 1] com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3029) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:823) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:821) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: Is the only possible reason that some of these 4.3 Million JSON-Objects are not valid JSON, or could there be another explanation? And if it is the reason, is there some way to tell the function to just skip faulty lines? Thanks, Durin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8278.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming questions
Thanks Anwar. On Tue, Jun 17, 2014 at 11:54 AM, Anwar Rizal anriza...@gmail.com wrote: On Tue, Jun 17, 2014 at 5:39 PM, Chen Song chen.song...@gmail.com wrote: Hey I am new to spark streaming and apologize if these questions have been asked. * In StreamingContext, reduceByKey() seems to only work on the RDDs of the current batch interval, not including RDDs of previous batches. Is my understanding correct? It's correct. * If the above statement is correct, what functions to use if one wants to do processing on the continuous stream batches of data? I see 2 functions, reduceByKeyAndWindow and updateStateByKey which serve this purpose. I presume that you need to keep a state that goes beyond one batch, so multiple batches. In this case, yes, updateStateByKey is the one you will use. Basically, updateStateByKey wraps a state into an RDD. My use case is an aggregation and doesn't fit a windowing scenario. * As for updateStateByKey, I have a few questions. ** Over time, will spark stage original data somewhere to replay in case of failures? Say the Spark job run for weeks, I am wondering how that sustains? ** Say my reduce key space is partitioned by some date field and I would like to stop processing old dates after a period time (this is not a simply windowing scenario as which date the data belongs to is not the same thing when the data arrives). How can I handle this to tell spark to discard data for old dates? You will need to call checkpoint (see http://spark.apache.org/docs/latest/streaming-programming-guide.html#rdd-checkpointing) that will persist the metadata of RDD that will consume memory (and stack execution) otherwise. You can set the interval of checkpointing that suits your need. Now, if you want to also reset your state after some times, there is no immediate way I can think of ,but you can do it through updateStateByKey, maybe by book-keeping the timestamp. Thank you, Best Chen -- Chen Song
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
wholeTextFiles and gzip
Interesting question on Stack Overflow: http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles Is it possible to read gzipped files using wholeTextFiles()? Alternately, is it possible to read the source file names using textFile()? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-and-gzip-tp8283.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
semi join spark streaming
Is there a easy way to do semi join in spark streaming? Here is my problem briefly, I have a DStream that will generate a set of values. I would like to check the existence in this set in other DStreams. Is there a easy and standard way to model this problem. If not, can I write spark streaming job to load the set of values from disk and cache to each worker? -- Chen Song
Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Expanded to 4 nodes and change the workers to listen to public DNS, but still it shows the same error (which is obviously wrong). I can't believe I'm the first to encounter this issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8285.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Right, ok. I can't say I've used the Cassandra OutputFormats before. But perhaps if you use it directly (instead of via Calliope) you may be able to get it to work, albeit with less concise code? Or perhaps you may be able to build Cassandra from source with Hadoop 2 / CDH4 support: https://groups.google.com/forum/#!topic/nosql-databases/Y-9amAdZk1s On Wed, Jun 25, 2014 at 9:14 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: pyspark regression results way off
Is a python binding for LBFGS in the works? My co-worker has written one and can contribute back if it helps. On Mon, Jun 16, 2014 at 11:00 AM, DB Tsai dbt...@stanford.edu wrote: Is your data normalized? Sometimes, GD doesn't work well if the data has wide range. If you are willing to write scala code, you can try LBFGS optimizer which converges better than GD. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Jun 16, 2014 at 8:14 AM, jamborta jambo...@gmail.com wrote: forgot to mention that I'm running spark 1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-regression-results-way-off-tp7672p7673.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonFile function in SQLContext does not work
Hi Durin, I guess that blank lines caused the problem (like Aaron said). Right now, jsonFile does not skip faulty lines. Can you first use sc.textfile to load the file as RDD[String] and then use filter to filter out those blank lines (code snippet can be found below)? val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rdd = sc.textFile(hdfs://host:9100/user/myuser/data.json).filter(r = r.trim != ) val table = sqlContext.jsonRDD(rdd) table.printSchema() Thanks, Yin On Wed, Jun 25, 2014 at 1:08 PM, Aaron Davidson ilike...@gmail.com wrote: Is it possible you have blank lines in your input? Not that this should be an error condition, but it may be what's causing it. On Wed, Jun 25, 2014 at 11:57 AM, durin m...@simon-schaefer.net wrote: Hi Zongheng Yang, thanks for your response. Reading your answer, I did some more tests and realized that analyzing very small parts of the dataset (which is ~130GB in ~4.3M lines) works fine. The error occurs when I analyze larger parts. Using 5% of the whole data, the error is the same as posted before for certain TIDs. However, I get the structure determined so far as a result when using 5%. The Spark WebUI shows the following: Job aborted due to stage failure: Task 6.0:11 failed 4 times, most recent failure: Exception failure in TID 108 on host foo.bar.com: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@3697781f; line: 1, column: 1] com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3029) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:823) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:821) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: Is the only possible reason that some of these 4.3 Million JSON-Objects are not valid JSON, or could there be another explanation? And if it is the reason, is there some way to tell the function to just skip faulty lines? Thanks, Durin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8278.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Worker nodes: Error messages
Hi All, I see the following error messages on my worker nodes. Are they due to improper cleanup or wrong configuration? Any help with this would be great! 14/06/25 12:30:55 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties14/06/25 12:30:55 INFO SecurityManager: Changing view acls to: userid14/06/25 12:30:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444)14/06/25 12:30:56 INFO Slf4jLogger: Slf4jLogger started14/06/25 12:30:56 INFO Remoting: Starting remoting14/06/25 12:30:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@worker1ip:60276]14/06/25 12:30:57 INFO Worker: Starting Spark worker worker1ip:60276 with 1 cores, 2.7 GB RAM14/06/25 12:30:57 INFO Worker: Spark home: /apps/software/spark-1.0.0-bin-hadoop114/06/25 12:30:57 INFO WorkerWebUI: Started WorkerWebUI at http://worker1ip:808114/06/25 12:30:57 INFO Worker: Connecting to master spark://serverip:7077...14/06/25 12:30:57 INFO Worker: Successfully registered with master spark://serverip:707714/06/25 12:32:05 INFO Worker: Asked to launch executor app-20140625123205-/2 for ApproxStrMatch14/06/25 12:32:05 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:56569/user/CoarseGrainedScheduler 2 p worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625123205-14/06/25 12:32:09 INFO Worker: Executor app-20140625123205-/2 finished with state FAILED message Command exited with code 1 exitStatus 114/06/25 12:32:09 INFO Worker: Asked to launch executor app-20140625123205-/5 for ApproxStrMatch14/06/25 12:32:09 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:56569/user/CoarseGrainedScheduler 5 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625123205-14/06/25 12:32:12 INFO Worker: Executor app-20140625123205-/5 finished with state FAILED message Command exited with code 1 exitStatus 114/06/25 12:32:12 INFO Worker: Asked to launch executor app-20140625123205-/9 for ApproxStrMatch14/06/25 12:32:12 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:56569/user/CoarseGrainedScheduler 9 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625123205-14/06/25 12:32:16 INFO Worker: Asked to kill executor app-20140625123205-/914/06/25 12:32:16 INFO ExecutorRunner: Runner thread for executor app-20140625123205-/9 interrupted14/06/25 12:32:16 INFO ExecutorRunner: Killing process!14/06/25 12:32:16 INFO Worker: Executor app-20140625123205-/9 finished with state KILLED14/06/25 13:28:44 INFO Worker: Asked to launch executor app-20140625132844-0001/2 for ApproxStrMatch14/06/25 13:28:44 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:46648/user/CoarseGrainedScheduler 2 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625132844-000114/06/25 13:28:48 INFO Worker: Executor app-20140625132844-0001/2 finished with state FAILED message Command exited with code 1 exitStatus 114/06/25 13:28:48 INFO Worker: Asked to launch executor app-20140625132844-0001/5 for ApproxStrMatch14/06/25 13:28:48 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:46648/user/CoarseGrainedScheduler 5 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker
Re: pyspark regression results way off
There is no python binding for LBFGS. Feel free to submit a PR. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 25, 2014 at 1:41 PM, Mohit Jaggi mohitja...@gmail.com wrote: Is a python binding for LBFGS in the works? My co-worker has written one and can contribute back if it helps. On Mon, Jun 16, 2014 at 11:00 AM, DB Tsai dbt...@stanford.edu wrote: Is your data normalized? Sometimes, GD doesn't work well if the data has wide range. If you are willing to write scala code, you can try LBFGS optimizer which converges better than GD. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Jun 16, 2014 at 8:14 AM, jamborta jambo...@gmail.com wrote: forgot to mention that I'm running spark 1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-regression-results-way-off-tp7672p7673.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonFile function in SQLContext does not work
Hi Yin an Aaron, thanks for your help, this was indeed the problem. I've counted 1233 blank lines using grep, and the code snippet below works with those. From what you said, I guess that skipping faulty lines will be possible in later versions? Kind regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8293.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Hadoop interface vs class
After upgrading to Spark 1.0.0, I get this error: ERROR org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected I thought this was caused by a dependency on Hadoop 1.0.4 (even though I downloaded the Spark 1.0.0 for Hadoop 2), but I can't seem to fix it. Any advice?
wholeTextFiles like for binary files ?
Is there an equivalent of wholeTextFiles for binary files for example a set of images ? Cheers, Jaonary
trouble: Launching spark on hadoop + yarn.
i am trying to install spark on Hadoop+Yarn. I have installed spark using sbt (SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly ). This has worked fine. After that I am running : SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.0.5-alpha.jar ./bin/spark-class org.apache.spark.deploy.yarn.Client --jar examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.5-alpha.jar --class org.apache.spark.examples.SparkPi --arg yarn-cluster --arg 5 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 This process fails with the following error. When I look into the log file, there is a line containing success, INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=gpadmin IP=10.1.174.109 OPERATION=Start Contai\ ner Request TARGET=ContainerManageImpl RESULT=SUCCESS APPID=application_1399675492314_0012 CONTAINERID=container_1399675492314_\ 0012_01_10 several lines containing failure, 2014-06-13 18:08:36,661 WARN org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=UnknownUser IP=10.1.174.105 OPERATION=Stop Contain\ er RequestTARGET=ContainerManagerImpl RESULT=FAILURE DESCRIPTION=Trying to stop unknown container! APPID=application_1399675492\ 314_0012CONTAINERID=container_1399675492314_0012_01_01 2014-06-25 11:57:40,190 WARN org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=UnknownUser IP=10.1.174.105 OPERATION=Stop Contain\ er RequestTARGET=ContainerManagerImpl RESULT=FAILURE DESCRIPTION=Trying to stop unknown container! APPID=application_1399675492\ 314_0025CONTAINERID=container_1399675492314_0025_01_01 2014-06-25 12:29:52,679 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1399675492314\ _0026_01_03 transitioned from RUNNING to EXITED_WITH_FAILURE 2014-06-25 12:29:52,698 WARN org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=gpadmin OPERATION=Container Finished - Failed \ TARGET=ContainerImplRESULT=FAILURE DESCRIPTION=Container failed with state: EXITED_WITH_FAILUREAPPID=application_1399675492314_0026\ CONTAINERID=container_1399675492314_0026_01_03 2014-06-25 12:29:52,698 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1399675492314\ _0026_01_03 transitioned from EXITED_WITH_FAILURE to DONE the finally I get: Failed while trying to construct the redirect url to the log server. Log Server url may not be configured. Do I need to change the yarn-site.xml file to configure a log server? If so how? --Sanghamitra. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/trouble-Launching-spark-on-hadoop-yarn-tp8297.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ElasticSearch enrich
Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? Thanks guys b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca wrote: So I'm giving a talk at the Spark summit on using Spark ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients. This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. Hope this helps! Cheers, Holden :) On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Its not used as default serializer for some issues with compatibility requirement to register the classes.. Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote: I'm afraid persisting connection across two tasks is a dangerous act as they can't be guaranteed to be executed on the same machine. Your ES server may think its a man-in-the-middle attack! I think its possible to invoke a static method that give you a connection in a local 'pool', so nothing will sneak into your closure, but its too complex and there should be a better option. Never use kryo before, if its that good perhaps we should use it as the default serializer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271
Re: ElasticSearch enrich
On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. Thanks guys b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca wrote: So I'm giving a talk at the Spark summit on using Spark ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients. This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. Hope this helps! Cheers, Holden :) On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Its not used as default serializer for some issues with compatibility requirement to register the classes.. Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote: I'm afraid persisting connection across two tasks is a dangerous act as they can't be guaranteed to be executed on the same machine. Your ES server may think its a man-in-the-middle attack! I think its possible to invoke a static method that give you a connection in a local 'pool', so nothing will sneak into your closure, but its too complex and there should be a better option. Never use kryo before, if its that good perhaps we should use it as the default serializer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Number of executors smaller than requested in YARN.
Hi, When I try requesting a large number of executors - e.g. 242, it doesn't seem to actually reach that number. E.g., under the executors tab, I only see an executor ID of upto 234. This despite the fact that there're plenty more memory available as well as CPU cores, etc in the system. In fact, in the YARN page, it shows that 243 containers are running (242 executors + driver). Anyone know what's going on?
Does Spark restart cached workers even without failures?
I'm doing coalesce with shuffle, cache and then do thousands of iterations. I noticed that sometimes Spark would for no particular reason perform partial coalesce again after running for a long time - and there was no exception or failure on the worker's part. Why is this happening?
Spark standalone network configuration problems
Hi all, I have a 2-machine Spark network I've set up: a master and worker on machine1, and worker on machine2. When I run 'sbin/start-all.sh', everything starts up as it should. I see both workers listed on the UI page. The logs of both workers indicate successful registration with the Spark master. The problems begin when I attempt to submit a job: I get an address already in use exception that crashes the program. It says Failed to bind to and lists the exact port and address of the master. At this point, the only items I have set in my spark-env.sh are SPARK_MASTER_IP and SPARK_MASTER_PORT (non-standard, set to 5060). The next step I took, then, was to explicitly set SPARK_LOCAL_IP on the master to 127.0.0.1. This allows the master to successfully send out the jobs; however, it ends up canceling the stage after running this command several times: 14/06/25 21:00:47 INFO AppClient$ClientActor: Executor added: app-20140625210032-/8 on worker-20140625205623-machine2-53597 (machine2:53597) with 8 cores 14/06/25 21:00:47 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140625210032-/8 on hostPort machine2:53597 with 8 cores, 8.0 GB RAM 14/06/25 21:00:47 INFO AppClient$ClientActor: Executor updated: app-20140625210032-/8 is now RUNNING 14/06/25 21:00:49 INFO AppClient$ClientActor: Executor updated: app-20140625210032-/8 is now FAILED (Command exited with code 1) The /8 started at /1, eventually becomes /9, and then /10, at which point the program crashes. The worker on machine2 shows similar messages in its logs. Here are the last bunch: 14/06/25 21:00:31 INFO Worker: Executor app-20140625210032-/9 finished with state FAILED message Command exited with code 1 exitStatus 1 14/06/25 21:00:31 INFO Worker: Asked to launch executor app-20140625210032-/10 for app_name Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/06/25 21:00:32 INFO ExecutorRunner: Launch command: java -cp ::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms8192M -Xmx8192M org.apache.spark.executor.CoarseGrainedExecutorBackend *akka.tcp://spark@localhost:5060/user/CoarseGrainedScheduler* 10 machine2 8 akka.tcp://sparkWorker@machine2:53597/user/Worker app-20140625210032- 14/06/25 21:00:33 INFO Worker: Executor app-20140625210032-/10 finished with state FAILED message Command exited with code 1 exitStatus 1 I highlighted the part that seemed strange to me; that's the master port number (I set it to 5060), and yet it's referencing localhost? Is this the reason why machine2 apparently can't seem to give a confirmation to the master once the job is submitted? (The logs from the worker on the master node indicate that it's running just fine) I appreciate any assistance you can offer! Regards, Shannon Quinn
Re: Changing log level of spark
I have a log4j.xml in src/main/resources with ?xml version=1.0 encoding=UTF-8 ? !DOCTYPE log4j:configuration SYSTEM log4j.dtd log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/; [...] root priority value =warn / appender-ref ref=Console / /root /log4j:configuration and that is included in the jar I package with `sbt assembly`. That works fine for me, at least on the driver. Tobias On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote: Hi! According to https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging, changing log-level is just a matter of creating a log4j.properties (which is in the classpath of spark) and changing log level there for the root logger. I did this steps on every node in the cluster (master and worker nodes). However, after restart there is still no debug output as desired, but only the default info log level.
Spark vs Google cloud dataflow
Hi, Today Google announced their cloud dataflow, which is very similar to spark in performing batch processing and stream processing. How does spark compare to Google cloud dataflow? Are they solutions trying to aim the same problem?
Re: Changing log level of spark
If you're using the spark-ec2 scripts, you may have to change /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that is added to the classpath before Spark's own conf. On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote: I have a log4j.xml in src/main/resources with ?xml version=1.0 encoding=UTF-8 ? !DOCTYPE log4j:configuration SYSTEM log4j.dtd log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/; [...] root priority value =warn / appender-ref ref=Console / /root /log4j:configuration and that is included in the jar I package with `sbt assembly`. That works fine for me, at least on the driver. Tobias On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote: Hi! According to https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging , changing log-level is just a matter of creating a log4j.properties (which is in the classpath of spark) and changing log level there for the root logger. I did this steps on every node in the cluster (master and worker nodes). However, after restart there is still no debug output as desired, but only the default info log level.
Spark executor error
I'm seeing the following message in the log of an executor. Anyone seen this error? After this, the executor seems to lose the cache, and but besides that the whole thing slows down drastically - I.e. it gets stuck in a reduce phase for 40+ minutes, whereas before it was finishing reduces in 2~3 seconds. 14/06/25 19:22:31 WARN SendingConnection: Error writing in connection to ConnectionManagerId(alpinenode7.alpinenow.local,46251) java.lang.NullPointerException at org.apache.spark.network.MessageChunkHeader.buffer$lzycompute(MessageChunkHeader.scala:35) at org.apache.spark.network.MessageChunkHeader.buffer(MessageChunkHeader.scala:32) at org.apache.spark.network.MessageChunk.buffers$lzycompute(MessageChunk.scala:31) at org.apache.spark.network.MessageChunk.buffers(MessageChunk.scala:29) at org.apache.spark.network.SendingConnection.write(Connection.scala:349) at org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:142) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
Where Can I find the full documentation for Spark SQL?
Hi, I want to know the full list of functions, syntax, features that Spark SQL supports, is there some documentations. Regards, Xiaobo Gu
Re: Where Can I find the full documentation for Spark SQL?
You can find something in the API, nothing more than that I think for now. Gianluca On 25 Jun 2014, at 23:36, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, I want to know the full list of functions, syntax, features that Spark SQL supports, is there some documentations. Regards, Xiaobo Gu
Re: Where Can I find the full documentation for Spark SQL?
the api only says this : public JavaSchemaRDD sql(String sqlQuery)Executes a query expressed in SQL, returning the result as a JavaSchemaRDD but what kind of sqlQuery we can execute, is there any more documentation? Xiaobo Gu -- Original -- From: Gianluca Privitera;gianluca.privite...@studio.unibo.it; Date: Jun 26, 2014 To: user@spark.apache.orguser@spark.apache.org; Subject: Re: Where Can I find the full documentation for Spark SQL? You can find something in the API, nothing more than that I think for now. Gianluca On 25 Jun 2014, at 23:36, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, I want to know the full list of functions, syntax, features that Spark SQL supports, is there some documentations. Regards, Xiaobo Gu .