Re: Getting started : Spark on YARN issue
Hi Andrew Thanks Andrew for your suggestion. I updated the hdfs-site on server side and also on client side to use hostname instead of IP as mentioned here = http://rainerpeter.wordpress.com/2014/02/12/connect-to-hdfs-running-in-ec2-using-public-ip-addresses/ . Now, I could see that the client is able to talk to the datanode. Also, I will consider submitting application from within ec2 itself so that private IP is resolvable. Thanks Praveen On Fri, Jun 20, 2014 at 2:35 AM, Andrew Or and...@databricks.com wrote: (Also, an easier workaround is to simply submit the application from within your cluster, thus saving you all the manual labor of reconfiguring everything to use public hostnames. This may or may not be applicable to your use case.) 2014-06-19 14:04 GMT-07:00 Andrew Or and...@databricks.com: Hi Praveen, Yes, the fact that it is trying to use a private IP from outside of the cluster is suspicious. My guess is that your HDFS is configured to use internal IPs rather than external IPs. This means even though the hadoop confs on your local machine only use external IPs, the org.apache.spark.deploy.yarn.Client that is running on your local machine is trying to use whatever address your HDFS name node tells it to use, which is private in this case. A potential fix is to update your hdfs-site.xml (and other related configs) within your cluster to use public hostnames. Let me know if that does the job. Andrew 2014-06-19 6:04 GMT-07:00 Praveen Seluka psel...@qubole.com: I am trying to run Spark on YARN. I have a hadoop 2.2 cluster (YARN + HDFS) in EC2. Then, I compiled Spark using Maven with 2.2 hadoop profiles. Now am trying to run the example Spark job . (In Yarn-cluster mode). From my *local machine. *I have setup HADOOP_CONF_DIR environment variable correctly. ➜ spark git:(master) ✗ /bin/bash -c ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2 --driver-memory 2g --executor-memory 2g --executor-cores 1 examples/target/scala-2.10/spark-examples_*.jar 10 14/06/19 14:59:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/19 14:59:39 INFO client.RMProxy: Connecting to ResourceManager at ec2-54-242-244-250.compute-1.amazonaws.com/54.242.244.250:8050 14/06/19 14:59:41 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/06/19 14:59:41 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/06/19 14:59:41 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 12288 14/06/19 14:59:41 INFO yarn.Client: Preparing Local resources 14/06/19 14:59:42 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/06/19 14:59:43 INFO yarn.Client: Uploading file:/home/rgupta/awesome/spark/examples/target/scala-2.10/spark-examples_2.10-1.0.0-SNAPSHOT.jar to hdfs:// ec2-54-242-244-250.compute-1.amazonaws.com:8020/user/rgupta/.sparkStaging/application_1403176373037_0009/spark-examples_2.10-1.0.0-SNAPSHOT.jar 14/06/19 15:00:45 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/ 10.180.150.66:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1305) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1128) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) 14/06/19 15:00:45 INFO hdfs.DFSClient: Abandoning BP-1714253233-10.180.215.105-1403176367942:blk_1073741833_1009 14/06/19 15:00:46 INFO hdfs.DFSClient: Excluding datanode 10.180.150.66:50010 14/06/19 15:00:46 WARN hdfs.DFSClient: DataStreamer Exception Its able to talk to Resource Manager Then it puts the example.jar file to HDFS and it fails. Its trying to write to datanode. I verified that 50010 port is accessible through local machine. Any idea whats the issue here ? One thing thats suspicious is */10.180.150.66:50010 http://10.180.150.66:50010 - it looks like its trying to connect using private IP. If so, how can I resolve this to use public IP.* Thanks Praveen
Re: How to store JavaRDD as a sequence file using spark java API?
Any inputs on this will be helpful. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-JavaRDD-as-a-sequence-file-using-spark-java-API-tp7969p7980.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to store JavaRDD as a sequence file using spark java API?
You can use JavaPairRDD.saveAsHadoopFile/saveAsNewAPIHadoopFile. Best Regards, Shixiong Zhu 2014-06-20 14:22 GMT+08:00 abhiguruvayya sharath.abhis...@gmail.com: Any inputs on this will be helpful. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-JavaRDD-as-a-sequence-file-using-spark-java-API-tp7969p7980.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
problem about cluster mode of spark 1.0.0
my programer runs in standalone model, the commond line is like: /opt/spark-1.0.0/bin/spark-submit \ --verbose \ --class $class_name --master spark://master:7077 \ --driver-memory 15G \ --driver-cores 2 \ --deploy-mode cluster \ hdfs://master:9000/user/root/jartest/test.jar But test.jar can't be copied to worker node, so it prompts that java.lang.ClassNotFoundException. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-cluster-mode-of-spark-1-0-0-tp7982.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to store JavaRDD as a sequence file using spark java API?
Does JavaPairRDD.saveAsHadoopFile store data as a sequenceFile? Then what is the significance of RDD.saveAsSequenceFile? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-JavaRDD-as-a-sequence-file-using-spark-java-API-tp7969p7983.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: 1.0.1 release plan
Hey There, I'd like to start voting on this release shortly because there are a few important fixes that have queued up. We're just waiting to fix an akka issue. I'd guess we'll cut a vote in the next few days. - Patrick On Thu, Jun 19, 2014 at 10:47 AM, Mingyu Kim m...@palantir.com wrote: Hi all, Is there any plan for 1.0.1 release? Mingyu
Re: broadcast in spark streaming
I get it. thank you On Fri, Jun 20, 2014 at 4:43 PM, Sourav Chandra sourav.chan...@livestream.com wrote: From the StreamingContext object, you can get reference of SparkContext using which you can create broadcast variables On Fri, Jun 20, 2014 at 2:09 PM, Hahn Jiang hahn.jiang@gmail.com wrote: I want to use broadcast in spark streaming, but I found there is no this function. How can I use global variable in spark streaming? thanks -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
How could I set the number of executor?
spark-submit has an arguments --num-executors to set the number of executor, but how could I set it from anywhere else? We're using Shark, and want to change the number of executor. The number of executor seems to be same as workers by default? Shall we configure the executor number manually(Is there an automatically way?) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-set-the-number-of-executor-tp7990.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space
Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit : Hello Andrew, i wish I could share the code, but for proprietary reasons I can't. But I can give some idea though of what i am trying to do. The job reads a file and for each line of that file and processors these lines. I am not doing anything intense in the processLogs function import argonaut._ import argonaut.Argonaut._ /* all of these case classes are created from json strings extracted from the line in the processLogs() function * */ case class struct1… case class struct2… case class value1(struct1, struct2) def processLogs(line:String): Option[(key1, value1)] {… } def run(sparkMaster, appName, executorMemory, jarsPath) { val sparkConf = new SparkConf() sparkConf.setMaster(sparkMaster) sparkConf.setAppName(appName) sparkConf.set(spark.executor.memory, executorMemory) sparkConf.setJars(jarsPath) // This includes all the jars relevant jars.. val sc = new SparkContext(sparkConf) val rawLogs = sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt) rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt) } If I switch to local mode, the code runs just fine, it fails with the error I pasted above. In the cluster mode, even writing back the file we just read fails (rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) I still believe this is a classNotFound error in disguise Indeed you are right, this can be the reason. I had similar errors when defining case classes in the shell and trying to use them in the RDDs. Are you shading argonaut in the fat jar ? Thanks Shivani On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com wrote: Wait, so the file only has four lines and the job running out of heap space? Can you share the code you're running that does the processing? I'd guess that you're doing some intense processing on every line but just writing parsed case classes back to disk sounds very lightweight. I On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com wrote: I am trying to process a file that contains 4 log lines (not very long) and then write my parsed out case classes to a destination folder, and I get the following error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) Sadly, there are several folks that have faced this error while trying to execute Spark jobs and there are various solutions, none
Re: How could I set the number of executor?
--num-executors seems to be only available with YARN-only. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-set-the-number-of-executor-tp7990p7992.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLLib inside Storm : silly or not ?
Yes, learning on a dedicated Spark cluster and predicting inside a Storm bolt is quite OK :) Thanks all for your answers. I'll post back if/when we experience this solution. E/ 2014-06-19 20:45 GMT+02:00 Shuo Xiang shuoxiang...@gmail.com: If I'm understanding correctly, you want to use MLlib for offline training and then deploy the learned model to Storm? In this case I don't think there is any problem. However if you are looking for online model update/training, this can be complicated and I guess quite a few algorithms in mllib at this time are designed for offline/batch learning. On Thu, Jun 19, 2014 at 12:26 AM, Eustache DIEMERT eusta...@diemert.fr wrote: Hi Sparkers, We have a Storm cluster and looking for a decent execution engine for machine learned models. What I've seen from MLLib is extremely positive, but we can't just throw away our Storm based stack. So my question is: is it feasible/recommended to train models in Spark/MLLib and execute them in another Java environment (Storm in this case) ? Thanks for any insights :) Eustache
Anything like grid search available for mlbase?
Looking for something like scikit's grid search module. C
parallel Reduce within a key
Hi, I am on Spark 0.9.0 I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32 cores in the cluster). I have an input rdd with 64 partitions. I am running sc.mapPartitions(...).reduce(...) I can see that I get full parallelism on the mapper (all my 32 cores are busy simultaneously). However, when it comes to reduce(), the outputs of the mappers are all reduced SERIALLY. Further, all the reduce processing happens only on 1 of the workers. I was expecting that the outputs of the 16 mappers on node 1 would be reduced in parallel in node 1 while the outputs of the 16 mappers on node 2 would be reduced in parallel on node 2 and there would be 1 final inter-node reduce (of node 1 reduced result and node 2 reduced result). Isn't parallel reduce supported WITHIN a key (in this case I have no key) ? (I know that there is parallelism in reduce across keys) Best Regards Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
java.net.SocketTimeoutException: Read timed out and java.io.IOException: Filesystem closed on Spark 1.0
Hi all, I'm running a job that seems to continually fail with the following exception: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1323) ... org.apache.spark.executor.Executor.org http://org.apache.spark.executor.executor.org/ $apache$spark$executor$Executor$$updateDependencies(Executor.scala:330) This is running spark-assembly-1.0.0-hadoop2.3.0 through yarn. The only additional error I see is 14/06/20 10:44:15 WARN NewHadoopRDD: Exception in RecordReader.close() net.sf.samtools.util.RuntimeIOException: java.io.IOException: Filesystem closed I had thought this issue of the file system closed was resolved in https://issues.apache.org/jira/browse/SPARK-1676. I've also attempted to run under a single core to avoid this issue (which seems to help sometimes as this failure is intermittent) I saw a previous mail thread: http://apache-spark-user-list.1001560.n3.nabble.com/Filesystem-closed-while-running-spark-job-td4596.html a suggestion to disable caching? Anyone seen this before or know a resolution. As I mentioned this is intermittent as sometimes the job runs to completion, or sometimes fails in this way. Thanks, Arun
Re: Anything like grid search available for mlbase?
This is a planned feature for v1.1. I'm going to work on it after v1.0.1 release. -Xiangrui On Jun 20, 2014, at 6:46 AM, Charles Earl charles.ce...@gmail.com wrote: Looking for something like scikit's grid search module. C
Performance problems on SQL JOIN
Hi there, We're trying out Spark and are experiencing some performance issues using Spark SQL. Anyone who can tell us if our results are normal? We are using the Amazon EC2 scripts to create a cluster with 3 workers/executors (m1.large). Tried both spark 1.0.0 as well as the git master; the Scala as well as the Python shells. Running the following code takes about 5 minutes, which seems a long time for this query. val file = sc.textFile(s3n:// ... .csv); val data = file.map(x = x.split('|')); // 300k rows case class BookingInfo(num_rooms: String, hotelId: String, toDate: String, ...); val rooms2 = data.filter(x = x(0) == 2).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 50k rows val rooms3 = data.filter(x = x(0) == 3).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 30k rows rooms2.registerAsTable(rooms2); cacheTable(rooms2); rooms3.registerAsTable(rooms3); cacheTable(rooms3); sql(SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId = rooms3.hotelId AND rooms2.toDate = rooms3.toDate).count(); Are we doing something wrong here? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space
Hello Abhi, I did try that and it did not work And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So how did you overcome this problem? Shivani On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit : Hello Andrew, i wish I could share the code, but for proprietary reasons I can't. But I can give some idea though of what i am trying to do. The job reads a file and for each line of that file and processors these lines. I am not doing anything intense in the processLogs function import argonaut._ import argonaut.Argonaut._ /* all of these case classes are created from json strings extracted from the line in the processLogs() function * */ case class struct1… case class struct2… case class value1(struct1, struct2) def processLogs(line:String): Option[(key1, value1)] {… } def run(sparkMaster, appName, executorMemory, jarsPath) { val sparkConf = new SparkConf() sparkConf.setMaster(sparkMaster) sparkConf.setAppName(appName) sparkConf.set(spark.executor.memory, executorMemory) sparkConf.setJars(jarsPath) // This includes all the jars relevant jars.. val sc = new SparkContext(sparkConf) val rawLogs = sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt) rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt) } If I switch to local mode, the code runs just fine, it fails with the error I pasted above. In the cluster mode, even writing back the file we just read fails (rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) I still believe this is a classNotFound error in disguise Indeed you are right, this can be the reason. I had similar errors when defining case classes in the shell and trying to use them in the RDDs. Are you shading argonaut in the fat jar ? Thanks Shivani On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com wrote: Wait, so the file only has four lines and the job running out of heap space? Can you share the code you're running that does the processing? I'd guess that you're doing some intense processing on every line but just writing parsed case classes back to disk sounds very lightweight. I On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com wrote: I am trying to process a file that contains 4 log lines (not very long) and then write my parsed out case classes to a destination folder, and I get the following error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at
Re: trying to understand yarn-client mode
thanks! i will try that. i guess what i am most confused about is why the executors are trying to retrieve the jars directly using the info i provided to add jars to my spark context. i mean, thats bound to fail no? i could be on a different machine (so my file://) isnt going to work for them, or i could have the jars in a directory that is only readable by me. how come the jars are not just shipped to yarn as part of the job submittal? i am worried i am supposed to put the jars in a central location and yarn is going to fetch them from there, leading to jars in yet another place such as on hdfs which i find pretty messy. On Thu, Jun 19, 2014 at 2:54 PM, Marcelo Vanzin van...@cloudera.com wrote: Coincidentally, I just ran into the same exception. What's probably happening is that you're specifying some jar file in your job as an absolute local path (e.g. just /home/koert/test-assembly-0.1-SNAPSHOT.jar), but your Hadoop config has the default FS set to HDFS. So your driver does not know that it should tell executors to download that file from the driver. If you specify the jar with the file: scheme that should solve the problem. On Thu, Jun 19, 2014 at 10:22 AM, Koert Kuipers ko...@tresata.com wrote: i am trying to understand how yarn-client mode works. i am not using Application application_1403117970283_0014 failed 2 times due to AM Container for appattempt_1403117970283_0014_02 exited with exitCode: -1000 due to: File file:/home/koert/test-assembly-0.1-SNAPSHOT.jar does not exist .Failing this attempt.. Failing the application. -- Marcelo
Better way to use a large data set?
Hi All, I have a 8 mill row, 500 column data set, which is derived by reading a text file and doing a filter, flatMap operation to weed out some anomalies. Now, I have a process which has to run through all 500 columns, do couple of map, reduce, forEach operations on the data set and return some statistics as output. I have thought of the following approaches. Approach 1: i)Read the DataSet from textfile, do some operations, get a RDD. Use toArray or collect on this RDD and broadcast it. ii) Do a flatMap on a range of numbers, this range being equivalent to the number of columns. iii) In each flatMap operation, perform the required operations on the broadcast variable to derive the stats, return the array of stats Questions about this approach: 1) Is there a preference amongst toArray and collect? 2) Can I not directly broadcast a RDD instead of first collecting it and broadcasting it? I tried this, but I got a serialization exception. 3) When I use sc.parallelize on the broadcast dataset, would it be a problem if there isn't enough space to store it in-memory? Approach 2: Instead of reading the textfile, doing some operations and then broadcasting it, I was planning to do the read part within each of the 500 steps of the flatMap (assuming I have 500 columns) Is this better than Approach 1? In Approach 1, I'd have to read once and broadcast whilst here, I'd have to read 500 times. Approach 3: Do a transpose of the dataset and then flatMap on the transposed matrix. Could someone please point out the best approach from above, or if there's a better way to solve this? Thank you for the help! Vinay
Re: Performance problems on SQL JOIN
Your data source is S3 and data is used twice. m1.large does not have very good network performance. Please try file.count() and see how fast it goes. -Xiangrui On Jun 20, 2014, at 8:16 AM, mathias math...@socialsignificance.co.uk wrote: Hi there, We're trying out Spark and are experiencing some performance issues using Spark SQL. Anyone who can tell us if our results are normal? We are using the Amazon EC2 scripts to create a cluster with 3 workers/executors (m1.large). Tried both spark 1.0.0 as well as the git master; the Scala as well as the Python shells. Running the following code takes about 5 minutes, which seems a long time for this query. val file = sc.textFile(s3n:// ... .csv); val data = file.map(x = x.split('|')); // 300k rows case class BookingInfo(num_rooms: String, hotelId: String, toDate: String, ...); val rooms2 = data.filter(x = x(0) == 2).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 50k rows val rooms3 = data.filter(x = x(0) == 3).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 30k rows rooms2.registerAsTable(rooms2); cacheTable(rooms2); rooms3.registerAsTable(rooms3); cacheTable(rooms3); sql(SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId = rooms3.hotelId AND rooms2.toDate = rooms3.toDate).count(); Are we doing something wrong here? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Performance problems on SQL JOIN
Also - you could consider caching your data after the first split (before the first filter), this will prevent you from retrieving the data from s3 twice. On Fri, Jun 20, 2014 at 8:32 AM, Xiangrui Meng men...@gmail.com wrote: Your data source is S3 and data is used twice. m1.large does not have very good network performance. Please try file.count() and see how fast it goes. -Xiangrui On Jun 20, 2014, at 8:16 AM, mathias math...@socialsignificance.co.uk wrote: Hi there, We're trying out Spark and are experiencing some performance issues using Spark SQL. Anyone who can tell us if our results are normal? We are using the Amazon EC2 scripts to create a cluster with 3 workers/executors (m1.large). Tried both spark 1.0.0 as well as the git master; the Scala as well as the Python shells. Running the following code takes about 5 minutes, which seems a long time for this query. val file = sc.textFile(s3n:// ... .csv); val data = file.map(x = x.split('|')); // 300k rows case class BookingInfo(num_rooms: String, hotelId: String, toDate: String, ...); val rooms2 = data.filter(x = x(0) == 2).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 50k rows val rooms3 = data.filter(x = x(0) == 3).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 30k rows rooms2.registerAsTable(rooms2); cacheTable(rooms2); rooms3.registerAsTable(rooms3); cacheTable(rooms3); sql(SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId = rooms3.hotelId AND rooms2.toDate = rooms3.toDate).count(); Are we doing something wrong here? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
broadcast not working in yarn-cluster mode
Hi, Since I migrated to spark 1.0.0, a couple of applications that used to work in 0.9.1 now fail when broadcasting a variable. Those applications are run on a YARN cluster in yarn-cluster mode (and used to run in yarn-standalone mode in 0.9.1) Here is an extract of the error log: Exception in thread Thread-3 java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:186) Caused by: java.lang.NoSuchMethodError: org.apache.spark.SparkContext.broadcast(Ljava/lang/Object;)Lorg/apache/spark/broadcast/Broadcast; at kelkoo.MerchantOffersPerformance$.main(MerchantOffersPerformance.scala:289) at kelkoo.MerchantOffersPerformance.main(MerchantOffersPerformance.scala) Has anyone any idea how to solve this problem? Thanks, Christophe. Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Re: How do you run your spark app?
Hello Michael, I have a quick question for you. Can you clarify the statement build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. Can you give an example. I am using sbt assembly as well to create a fat jar, and supplying the spark and hadoop locations in the class path. Inside the main() function where spark context is created, I use SparkContext.jarOfClass(this).toList add the fat jar to my spark context. However, I seem to be running into issues with this approach. I was wondering if you had any inputs Michael. Thanks, Shivani On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal sonalgoy...@gmail.com wrote: We use maven for building our code and then invoke spark-submit through the exec plugin, passing in our parameters. Works well for us. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler mich...@tumra.com wrote: P.S. Last but not least we use sbt-assembly to build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. These are automatically built from source by our Jenkins and stored in HDFS. Our Chronos/Marathon jobs fetch the latest release TAR.GZ direct from HDFS, unpack it and launch the appropriate script. Makes for a much cleaner development / testing / deployment to package everything required in one go instead of relying on cluster specific classpath additions or any add-jars functionality. On 19 June 2014 22:53, Michael Cutler mich...@tumra.com wrote: When you start seriously using Spark in production there are basically two things everyone eventually needs: 1. Scheduled Jobs - recurring hourly/daily/weekly jobs. 2. Always-On Jobs - that require monitoring, restarting etc. There are lots of ways to implement these requirements, everything from crontab through to workflow managers like Oozie. We opted for the following stack: - Apache Mesos http://mesosphere.io/ (mesosphere.io distribution) - Marathon https://github.com/mesosphere/marathon - init/control system for starting, stopping, and maintaining always-on applications. - Chronos http://airbnb.github.io/chronos/ - general-purpose scheduler for Mesos, supports job dependency graphs. - ** Spark Job Server https://github.com/ooyala/spark-jobserver - primarily for it's ability to reuse shared contexts with multiple jobs The majority of our jobs are periodic (batch) jobs run through spark-sumit, and we have several always-on Spark Streaming jobs (also run through spark-submit). We always use client mode with spark-submit because the Mesos cluster has direct connectivity to the Spark cluster and it means all the Spark stdout/stderr is externalised into Mesos logs which helps diagnosing problems. I thoroughly recommend you explore using Mesos/Marathon/Chronos to run Spark and manage your Jobs, the Mesosphere tutorials are awesome and you can be up and running in literally minutes. The Web UI's to both make it easy to get started without talking to REST API's etc. Best, Michael On 19 June 2014 19:44, Evan R. Sparks evan.spa...@gmail.com wrote: I use SBT, create an assembly, and then add the assembly jars when I create my spark context. The main executor I run with something like java -cp ... MyDriver. That said - as of spark 1.0 the preferred way to run spark applications is via spark-submit - http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jun 19, 2014 at 11:36 AM, ldmtwo ldm...@gmail.com wrote: I want to ask this, not because I can't read endless documentation and several tutorials, but because there seems to be many ways of doing things and I keep having issues. How do you run /your /spark app? I had it working when I was only using yarn+hadoop1 (Cloudera), then I had to get Spark and Shark working and ended upgrading everything and dropped CDH support. Anyways, this is what I used with master=yarn-client and app_jar being Scala code compiled with Maven. java -cp $CLASSPATH -Dspark.jars=$APP_JAR -Dspark.master=$MASTER $CLASSNAME $ARGS Do you use this? or something else? I could never figure out this method. SPARK_HOME/bin/spark jar APP_JAR ARGS For example: bin/spark-class jar /usr/lib/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar pi 10 10 Do you use SBT or Maven to compile? or something else? ** It seams that I can't get subscribed to the mailing list and I tried both my work email and personal. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-run-your-spark-app-tp7935.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark on yarn is trying to use file:// instead of hdfs://
i noticed that when i submit a job to yarn it mistakenly tries to upload files to local filesystem instead of hdfs. what could cause this? in spark-env.sh i have HADOOP_CONF_DIR set correctly (and spark-submit does find yarn), and my core-site.xml has a fs.defaultFS that is hdfs, not local filesystem. thanks! koert
Re: spark on yarn is trying to use file:// instead of hdfs://
Hi Koert, Could you provide more details? Job arguments, log messages, errors, etc. On Fri, Jun 20, 2014 at 9:40 AM, Koert Kuipers ko...@tresata.com wrote: i noticed that when i submit a job to yarn it mistakenly tries to upload files to local filesystem instead of hdfs. what could cause this? in spark-env.sh i have HADOOP_CONF_DIR set correctly (and spark-submit does find yarn), and my core-site.xml has a fs.defaultFS that is hdfs, not local filesystem. thanks! koert -- Marcelo
Re: How to store JavaRDD as a sequence file using spark java API?
Yes, it can if you set the output format to SequenceFileOutputFormat. The difference is saveAsSequenceFile does the conversion to Writable for you if needed and then calls saveAsHadoopFile. On Fri, Jun 20, 2014 at 12:43 AM, abhiguruvayya sharath.abhis...@gmail.com wrote: Does JavaPairRDD.saveAsHadoopFile store data as a sequenceFile? Then what is the significance of RDD.saveAsSequenceFile? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-JavaRDD-as-a-sequence-file-using-spark-java-API-tp7969p7983.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space
In my case it was due to a case class I was defining in the spark-shell and not being available on the workers. So packaging it in a jar and adding it with ADD_JARS solved the problem. Note that I don't exactly remember if it was an out of heap space exception or pergmen space. Make sure your jarsPath is correct. Usually to debug this kind of problems I am using the spark-shell (you can do the same in your job but its more time constuming to repackage, deploy, run, iterate). Try for example 1) read the lines (without any processing) and count them 2) apply processing and count 2014-06-20 17:15 GMT+02:00 Shivani Rao raoshiv...@gmail.com: Hello Abhi, I did try that and it did not work And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So how did you overcome this problem? Shivani On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit : Hello Andrew, i wish I could share the code, but for proprietary reasons I can't. But I can give some idea though of what i am trying to do. The job reads a file and for each line of that file and processors these lines. I am not doing anything intense in the processLogs function import argonaut._ import argonaut.Argonaut._ /* all of these case classes are created from json strings extracted from the line in the processLogs() function * */ case class struct1… case class struct2… case class value1(struct1, struct2) def processLogs(line:String): Option[(key1, value1)] {… } def run(sparkMaster, appName, executorMemory, jarsPath) { val sparkConf = new SparkConf() sparkConf.setMaster(sparkMaster) sparkConf.setAppName(appName) sparkConf.set(spark.executor.memory, executorMemory) sparkConf.setJars(jarsPath) // This includes all the jars relevant jars.. val sc = new SparkContext(sparkConf) val rawLogs = sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt) rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt) } If I switch to local mode, the code runs just fine, it fails with the error I pasted above. In the cluster mode, even writing back the file we just read fails (rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) I still believe this is a classNotFound error in disguise Indeed you are right, this can be the reason. I had similar errors when defining case classes in the shell and trying to use them in the RDDs. Are you shading argonaut in the fat jar ? Thanks Shivani On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com wrote: Wait, so the file only has four lines and the job running out of heap space? Can you share the code you're running that does the processing? I'd guess that you're doing some intense processing on every line but just writing parsed case classes back to disk sounds very lightweight. I On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com wrote: I am trying to process a file that contains 4 log lines (not very long) and then write my parsed out case classes to a destination folder, and I get the following error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at
Re: trying to understand yarn-client mode
On Fri, Jun 20, 2014 at 8:22 AM, Koert Kuipers ko...@tresata.com wrote: thanks! i will try that. i guess what i am most confused about is why the executors are trying to retrieve the jars directly using the info i provided to add jars to my spark context. i mean, thats bound to fail no? i could be on a different machine (so my file://) isnt going to work for them, or i could have the jars in a directory that is only readable by me. how come the jars are not just shipped to yarn as part of the job submittal? They are if they are specified correctly. Check the guide: http://spark.apache.org/docs/latest/submitting-applications.html See the Advanced Dependency Management section. Your default filesystem is probably hdfs, which means that if you provide a path with no protocol, the executors will consider it as an hdfs path, and it won't work if you're pointing at a file that exists in your local fs. -- Marcelo
Re: 1.0.1 release plan
Sounds good. Mingyu and I are waiting on 1.0.1 to get the fix for the below issues without running a patched version of Spark: https://issues.apache.org/jira/browse/SPARK-1935 -- commons-codec version conflicts for client applications https://issues.apache.org/jira/browse/SPARK-2043 -- correctness issue with spilling On Fri, Jun 20, 2014 at 1:04 AM, Patrick Wendell pwend...@gmail.com wrote: Hey There, I'd like to start voting on this release shortly because there are a few important fixes that have queued up. We're just waiting to fix an akka issue. I'd guess we'll cut a vote in the next few days. - Patrick On Thu, Jun 19, 2014 at 10:47 AM, Mingyu Kim m...@palantir.com wrote: Hi all, Is there any plan for 1.0.1 release? Mingyu
Re: spark on yarn is trying to use file:// instead of hdfs://
yeah sure see below. i strongly suspect its something i misconfigured causing yarn to try to use local filesystem mistakenly. * [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --executor-cores 1 hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at cdh5-yarn.tresata.com/192.168.1.85:8032 14/06/20 12:54:41 INFO Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource in this cluster 8192 14/06/20 12:54:41 INFO Client: Preparing Local resources 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/06/20 12:54:41 INFO Client: Uploading hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 14/06/20 12:54:43 INFO Client: Setting up the launch environment 14/06/20 12:54:43 INFO Client: Setting up container launch context 14/06/20 12:54:43 INFO Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\3\, -Dspark.storage.blockManagerTimeoutIntervalMs=\12\, -Dspark.storage.blockManagerHeartBeatMs=\12\, -Dspark.app.name=\org.apache.spark.examples.SparkPi\, -Dspark.akka.frameSize=\1\, -Dspark.akka.timeout=\3\, -Dspark.worker.timeout=\3\, -Dspark.akka.logLifecycleEvents=\true\, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar, --args '10' , --executor-memory, 1024, --executor-cores, 1, --num-executors , 3, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr) 14/06/20 12:54:43 INFO Client: Submitting application to ASM 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application application_1403201750110_0060 14/06/20 12:54:44 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:45 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:46 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:47 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: Application application_1403201750110_0060 failed 2 times due to AM Container for appattempt_1403201750110_0060_02 exited with exitCode: -1000 due to: File file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar does not exist .Failing this attempt.. Failing the application. appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: FAILED distributedFinalState: FAILED appTrackingUrl: cdh5-yarn.tresata.com:8088/cluster/app/application_1403201750110_0060 appUser: koert On Fri, Jun 20, 2014 at 12:42 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Koert, Could you provide more details? Job arguments, log messages, errors, etc. On Fri, Jun 20, 2014 at 9:40 AM, Koert Kuipers ko...@tresata.com wrote: i noticed that
Re: Performance problems on SQL JOIN
Thanks for your suggestions. file.count() takes 7s, so that doesn't seem to be the problem. Moreover, a union with the same code/CSV takes about 15s (SELECT * FROM rooms2 UNION SELECT * FROM rooms3). The web status page shows that both stages 'count at joins.scala:216' and 'reduce at joins.scala:219' take up the majority of the time. Is this due to bad partitioning or caching? Or is there a problem with the JOIN operator? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001p8016.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark on yarn is trying to use file:// instead of hdfs://
Koert, is there any chance that your fs.defaultFS isn't setup right? On Fri, Jun 20, 2014 at 9:57 AM, Koert Kuipers ko...@tresata.com wrote: yeah sure see below. i strongly suspect its something i misconfigured causing yarn to try to use local filesystem mistakenly. * [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --executor-cores 1 hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at cdh5-yarn.tresata.com/192.168.1.85:8032 14/06/20 12:54:41 INFO Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource in this cluster 8192 14/06/20 12:54:41 INFO Client: Preparing Local resources 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/06/20 12:54:41 INFO Client: Uploading hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 14/06/20 12:54:43 INFO Client: Setting up the launch environment 14/06/20 12:54:43 INFO Client: Setting up container launch context 14/06/20 12:54:43 INFO Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\3\, -Dspark.storage.blockManagerTimeoutIntervalMs=\12\, -Dspark.storage.blockManagerHeartBeatMs=\12\, -Dspark.app.name=\org.apache.spark.examples.SparkPi\, -Dspark.akka.frameSize=\1\, -Dspark.akka.timeout=\3\, -Dspark.worker.timeout=\3\, -Dspark.akka.logLifecycleEvents=\true\, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar, --args '10' , --executor-memory, 1024, --executor-cores, 1, --num-executors , 3, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr) 14/06/20 12:54:43 INFO Client: Submitting application to ASM 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application application_1403201750110_0060 14/06/20 12:54:44 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:45 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:46 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:47 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: Application application_1403201750110_0060 failed 2 times due to AM Container for appattempt_1403201750110_0060_02 exited with exitCode: -1000 due to: File file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar does not exist .Failing this attempt.. Failing the application. appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: FAILED distributedFinalState: FAILED appTrackingUrl: cdh5-yarn.tresata.com:8088/cluster/app/application_1403201750110_0060 appUser: koert On
Can not checkpoint Graph object's vertices but could checkpoint edges
I'm trying to workaround the StackOverflowError when an object have a long dependency chain, someone said I should use checkpoint to cuts off dependencies. I write a sample code to test it, but I can only checkpoint edges but not vertices. I think I do materialize vertices and edges after calling checkpoint, why only edge been checkpointed? Here is my code, really appreciate if you can point out what I did wrong. def main(args: Array[String]) { val conf = new SparkConf().setAppName(Test) .setMaster(local[4]) val sc = new SparkContext(conf) sc.setCheckpointDir(./checkpoint) val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L))) val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L))) var g = Graph(v, e) val vertexIds = Seq(0L, 1L, 2L) var prevG: Graph[VertexId, Long] = null for (i - 1 to 10) { vertexIds.toStream.foreach(id = { println(generate new graph) prevG = g g = Graph(g.vertices, g.edges) println(uncache vertices) prevG.unpersistVertices(blocking = false) println(uncache edges) prevG.edges.unpersist(blocking = false) //Third approach, do checkpoint //Vertices can not be checkpointed, still have StackOverflowError g.vertices.checkpoint() g.edges.checkpoint() println(g.vertices.count()+g.edges.count()) println(g.vertices.isCheckpointed+ +g.edges.isCheckpointed) }) println( iter + i + finished) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-checkpoint-Graph-object-s-vertices-but-could-checkpoint-edges-tp8019.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Parallel LogisticRegression?
I've tried to parallelize the separate regressions using allResponses.toParArray.map( x= do logistic regression against labels in x) But I start to see messages like 14/06/20 10:10:26 WARN scheduler.TaskSetManager: Lost TID 4193 (task 363.0:4) 14/06/20 10:10:27 WARN scheduler.TaskSetManager: Loss was due to fetch failure from null and finally 14/06/20 10:10:26 ERROR scheduler.TaskSetManager: Task 363.0:4 failed 4 times; aborting job Then 14/06/20 10:10:26 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed due to the error null; shutting down SparkContext 14/06/20 10:10:26 ERROR actor.OneForOneStrategy: java.lang.UnsupportedOperationException at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32) at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185) This doesn't happen when I don't use toParArray. I read that spark was thread safe, but I seem to be running into problems. Am I doing something wrong? Kyle On Thu, Jun 19, 2014 at 11:21 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working on a problem learning several different sets of responses against the same set of training features. Right now I've written the program to cycle through all of the different label sets, attached them to the training data and run LogisticRegressionWithSGD on each of them. ie foreach curResponseSet in allResponses: currentRDD : RDD[LabeledPoints] = curResponseSet joined with trainingData LogisticRegressionWithSGD.train(currentRDD) Each of the different training runs are independent. It seems like I should be parallelize them as well. Is there a better way to do this? Kyle
Re: How do you run your spark app?
Hi Shivani, I use sbt assembly to create a fat jar . https://github.com/sbt/sbt-assembly Example of the sbt file is below. import AssemblyKeys._ // put this at the top of the file assemblySettings mainClass in assembly := Some(FifaSparkStreaming) name := FifaSparkStreaming version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-twitter % 1.0.0).exclude(org.eclipse.jetty.orbit,javax.transaction) .exclude(org.eclipse.jetty.orbit,javax.servlet) .exclude(org.eclipse.jetty.orbit,javax.mail.glassfish) .exclude(org.eclipse.jetty.orbit,javax.activation) .exclude(com.esotericsoftware.minlog, minlog), (net.debasishg % redisclient_2.10 % 2.12).exclude(com.typesafe.akka,akka-actor_2.10)) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case application.conf = MergeStrategy.concat case unwanted.txt = MergeStrategy.discard case x = old(x) } } resolvers += Akka Repository at http://repo.akka.io/releases/; And I run as mentioned below. LOCALLY : 1) sbt 'run AP1z4IYraYm5fqWhITWArY53x Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014' If you want to submit on the cluster CLUSTER: 2) spark-submit --class FifaSparkStreaming --master spark://server-8-144:7077 --driver-memory 2048 --deploy-mode cluster FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014 Hope this helps. Thanks, Shrikar On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao raoshiv...@gmail.com wrote: Hello Michael, I have a quick question for you. Can you clarify the statement build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. Can you give an example. I am using sbt assembly as well to create a fat jar, and supplying the spark and hadoop locations in the class path. Inside the main() function where spark context is created, I use SparkContext.jarOfClass(this).toList add the fat jar to my spark context. However, I seem to be running into issues with this approach. I was wondering if you had any inputs Michael. Thanks, Shivani On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal sonalgoy...@gmail.com wrote: We use maven for building our code and then invoke spark-submit through the exec plugin, passing in our parameters. Works well for us. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler mich...@tumra.com wrote: P.S. Last but not least we use sbt-assembly to build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. These are automatically built from source by our Jenkins and stored in HDFS. Our Chronos/Marathon jobs fetch the latest release TAR.GZ direct from HDFS, unpack it and launch the appropriate script. Makes for a much cleaner development / testing / deployment to package everything required in one go instead of relying on cluster specific classpath additions or any add-jars functionality. On 19 June 2014 22:53, Michael Cutler mich...@tumra.com wrote: When you start seriously using Spark in production there are basically two things everyone eventually needs: 1. Scheduled Jobs - recurring hourly/daily/weekly jobs. 2. Always-On Jobs - that require monitoring, restarting etc. There are lots of ways to implement these requirements, everything from crontab through to workflow managers like Oozie. We opted for the following stack: - Apache Mesos http://mesosphere.io/ (mesosphere.io distribution) - Marathon https://github.com/mesosphere/marathon - init/control system for starting, stopping, and maintaining always-on applications. - Chronos http://airbnb.github.io/chronos/ - general-purpose scheduler for Mesos, supports job dependency graphs. - ** Spark Job Server https://github.com/ooyala/spark-jobserver - primarily for it's ability to reuse shared contexts with multiple jobs The majority of our jobs are periodic (batch) jobs run through spark-sumit, and we have several always-on Spark Streaming jobs (also run through spark-submit). We always use client mode with spark-submit because the Mesos cluster has direct connectivity to the Spark cluster and
Re: parallel Reduce within a key
How about a treeReduceByKey? :-) On Friday, June 20, 2014 11:55 AM, DB Tsai dbt...@stanford.edu wrote: Currently, the reduce operation combines the result from mapper sequentially, so it's O(n). Xiangrui is working on treeReduce which is O(log(n)). Based on the benchmark, it dramatically increase the performance. You can test the code in his own branch. https://github.com/apache/spark/pull/1110 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Jun 20, 2014 at 6:57 AM, ansriniv ansri...@gmail.com wrote: Hi, I am on Spark 0.9.0 I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32 cores in the cluster). I have an input rdd with 64 partitions. I am running sc.mapPartitions(...).reduce(...) I can see that I get full parallelism on the mapper (all my 32 cores are busy simultaneously). However, when it comes to reduce(), the outputs of the mappers are all reduced SERIALLY. Further, all the reduce processing happens only on 1 of the workers. I was expecting that the outputs of the 16 mappers on node 1 would be reduced in parallel in node 1 while the outputs of the 16 mappers on node 2 would be reduced in parallel on node 2 and there would be 1 final inter-node reduce (of node 1 reduced result and node 2 reduced result). Isn't parallel reduce supported WITHIN a key (in this case I have no key) ? (I know that there is parallelism in reduce across keys) Best Regards Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Possible approaches for adding extra metadata (Spark Streaming)?
Hi All, I was curious to know which of the two approach is better for doing analytics using spark streaming. Lets say we want to add some metadata to the stream which is being processed like sentiment, tags etc and then perform some analytics using these added metadata. 1) Is it ok to make a http call and add some extra information to the stream being processed in the updateByKeyAndWindow operations. 2) Add these sentiment/tags before and then stream through DStreams. Thanks, Shrikar
Re: Problems running Spark job on mesos in fine-grained mode
Hi, this is just a follow-up regarding this issue. Turns out that it's caused by a bug in Spark. I created a case for it: https://issues.apache.org/jira/browse/SPARK-2204 and submitted a patch. Any chance this could be included in the 1.0.1 release? Thanks, - Sebastien On Tue, Jun 17, 2014 at 2:57 PM, Sébastien Rainville sebastienrainvi...@gmail.com wrote: Hi, I'm having trouble running spark on mesos in fine-grained mode. I'm running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which most of the time, but not always, cause the job to fail. The same code is running fine in coarse-grained mode. I see the following exceptions in the logs of the spark driver: W0617 10:57:36.774382 8735 sched.cpp:901] Attempting to launch task 21 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715 W0617 10:57:36.774433 8735 sched.cpp:901] Attempting to launch task 22 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for 201311011608-1369465866-5050-9189-46 from TaskSet 0.0 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2) 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0) 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1) 14/06/17 10:57:36 INFO DAGScheduler: Executor lost: 201311011608-1369465866-5050-9189-46 (epoch 0) 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove executor 201311011608-1369465866-5050-9189-46 from BlockManagerMaster. 14/06/17 10:57:36 INFO BlockManagerMaster: Removed 201311011608-1369465866-5050-9189-46 successfully in removeExecutor 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list earlier: ca1-dcc1-0065.lab.mtl I don't see any exceptions in the spark executor logs. The only error message I found in mesos itself is warnings in the mesos master: W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21 : Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered mem(*):3216; disk(*):98304; ports(*):[11900-11919, 1192 1-11995, 11997-11999]; cpus(*):1 W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22 : Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered mem(*):3216; disk(*):98304; ports(*):[11900-11919, 1192 1-11995, 11997-11999]; cpus(*):1 W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28 : Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304; ports(*):[11900- 11960, 11962-11978, 11980-11999] W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-46 on slave 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl) W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-34 on slave 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl) W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-59 on slave 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl) W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-18 on slave 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl) ... (more of those Ignoring unknown exited executor) I analyzed the difference in between the execution of the same job in coarse-grained mode and fine-grained mode, and I noticed that in the fine-grained mode the tasks get executed on executors different than the ones reported in spark, as if spark and mesos get out of sync as to which executor is responsible for which task. See the following: Coarse-grained mode: Spark Mesos Task IndexTask ID ExecutorStatusTask ID (UI)Task Name Task ID (logs)ExecutorState 0066SUCCESS 4Task 40 66RUNNING1 159SUCCESS0 Task 0159 RUNNING22 54SUCCESS10Task 10 254RUNNING 33128 SUCCESS6Task 6 3 128RUNNING ... Fine-grained mode: Spark Mesos Task IndexTask ID ExecutorTask ID (UI)Task NameTask ID (logs) ExecutorState0 23108SUCCESS 23task 0.0:023 27FINISHED0 1965FAILED19 task 0.0:01986 FINISHED1 2165FAILED Mesos executor was never created124 92 SUCCESS24task 0.0:1 24129FINISHED 22265 FAILEDMesos executor was never created 225100SUCCESS 25task 0.0:2 2584FINISHED 32680SUCCESS 26task 0.0:326 124FINISHED 42765FAILED 27task 0.0:427 108FINISHED 42992SUCCESS 29task 0.0:429 65FINISHED 52865FAILED Mesos executor was never created5 3077SUCCESS30 task 0.0:53062 FINISHED6 053SUCCESS0 task 0.0:6041 FINISHED7 177SUCCESS1 task 0.0:71114 FINISHED... Is it normal that the executor reported in spark and mesos to be different when running in fine-grained mode? Please note that in this
Re: spark on yarn is trying to use file:// instead of hdfs://
ok solved it. as it happened in spark/conf i also had a file called core.site.xml (with some tachyone related stuff in it) so thats why it ignored /etc/hadoop/conf/core-site.xml On Fri, Jun 20, 2014 at 3:24 PM, Koert Kuipers ko...@tresata.com wrote: i put some logging statements in yarn.Client and that confirms its using local filesystem: 14/06/20 15:20:33 INFO Client: fs.defaultFS is file:/// so somehow fs.defaultFS is not being picked up from /etc/hadoop/conf/core-site.xml, but spark does correctly pick up yarn.resourcemanager.hostname from /etc/hadoop/conf/yarn-site.xml strange! On Fri, Jun 20, 2014 at 1:26 PM, Koert Kuipers ko...@tresata.com wrote: in /etc/hadoop/conf/core-site.xml: property namefs.defaultFS/name valuehdfs://cdh5-yarn.tresata.com:8020/value /property also hdfs seems the default: [koert@cdh5-yarn ~]$ hadoop fs -ls / Found 5 items drwxr-xr-x - hdfs supergroup 0 2014-06-19 12:31 /data drwxrwxrwt - hdfs supergroup 0 2014-06-20 12:17 /lib drwxrwxrwt - hdfs supergroup 0 2014-06-18 14:58 /tmp drwxr-xr-x - hdfs supergroup 0 2014-06-18 15:02 /user drwxr-xr-x - hdfs supergroup 0 2014-06-18 14:59 /var and in my spark-site.env: export HADOOP_CONF_DIR=/etc/hadoop/conf On Fri, Jun 20, 2014 at 1:04 PM, bc Wong bcwal...@cloudera.com wrote: Koert, is there any chance that your fs.defaultFS isn't setup right? On Fri, Jun 20, 2014 at 9:57 AM, Koert Kuipers ko...@tresata.com wrote: yeah sure see below. i strongly suspect its something i misconfigured causing yarn to try to use local filesystem mistakenly. * [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --executor-cores 1 hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at cdh5-yarn.tresata.com/192.168.1.85:8032 14/06/20 12:54:41 INFO Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource in this cluster 8192 14/06/20 12:54:41 INFO Client: Preparing Local resources 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/06/20 12:54:41 INFO Client: Uploading hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 14/06/20 12:54:43 INFO Client: Setting up the launch environment 14/06/20 12:54:43 INFO Client: Setting up container launch context 14/06/20 12:54:43 INFO Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\3\, -Dspark.storage.blockManagerTimeoutIntervalMs=\12\, -Dspark.storage.blockManagerHeartBeatMs=\12\, -Dspark.app.name=\org.apache.spark.examples.SparkPi\, -Dspark.akka.frameSize=\1\, -Dspark.akka.timeout=\3\, -Dspark.worker.timeout=\3\, -Dspark.akka.logLifecycleEvents=\true\, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar, --args '10' , --executor-memory, 1024, --executor-cores, 1, --num-executors , 3, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr) 14/06/20 12:54:43 INFO Client: Submitting application to ASM 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application application_1403201750110_0060 14/06/20 12:54:44 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/ appUser: koert 14/06/20 12:54:45 INFO Client: Application report from ASM: application identifier: application_1403201750110_0060 appId: 60 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.koert appMasterRpcPort: -1 appStartTime: 1403283283505 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl:
Running Spark alongside Hadoop
Dear Spark users, I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I would like to run Spark (in standalone mode) along side Hadoop on the same nodes. Given the configuration of my nodes, will that work? Does anyone has any experience in terms of stability and performance of running Spark and Hadoop on somewhat resource-constrained nodes. I was looking at the Spark documentation and there is a way to configure memory and cores for the and worker nodes and memory for the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to share resource between HAdoop and Spark?
Re: Running Spark alongside Hadoop
The ideal way to do that is to use a cluster manager like Yarn mesos. You can control how much resources to give to which node etc. You should be able to run both together in standalone mode however you may experience varying latency performance in the cluster as both MR spark demand resources from same machines etc. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:41 PM, Sameer Tilak ssti...@live.com wrote: Dear Spark users, I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I would like to run Spark (in standalone mode) along side Hadoop on the same nodes. Given the configuration of my nodes, will that work? Does anyone has any experience in terms of stability and performance of running Spark and Hadoop on somewhat resource-constrained nodes. I was looking at the Spark documentation and there is a way to configure memory and cores for the and worker nodes and memory for the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to share resource between HAdoop and Spark?
Re: Spark and RDF
You are looking to create Shark operators for RDF? Since Shark backend is shifting to SparkSQL it would be slightly hard but much better effort would be to shift Gremlin to Spark (though a much beefier one :) ) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com wrote: For RDF, may GraphX be particularly approriated? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi guys, I'm analyzing the possibility to use Spark to analyze RDF files and define reusable Shark operators on them (custom filtering, transforming, aggregating, etc). Is that possible? Any hint? Best, Flavio
Re: Spark and RDF
Maybe some SPARQL features in Shark, then ? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: You are looking to create Shark operators for RDF? Since Shark backend is shifting to SparkSQL it would be slightly hard but much better effort would be to shift Gremlin to Spark (though a much beefier one :) ) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com wrote: For RDF, may GraphX be particularly approriated? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi guys, I'm analyzing the possibility to use Spark to analyze RDF files and define reusable Shark operators on them (custom filtering, transforming, aggregating, etc). Is that possible? Any hint? Best, Flavio
Re: Spark and RDF
or a seperate RDD for sparql operations ala SchemaRDD .. operators for sparql can be defined thr.. not a bad idea :) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:56 PM, andy petrella andy.petre...@gmail.com wrote: Maybe some SPARQL features in Shark, then ? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: You are looking to create Shark operators for RDF? Since Shark backend is shifting to SparkSQL it would be slightly hard but much better effort would be to shift Gremlin to Spark (though a much beefier one :) ) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:39 PM, andy petrella andy.petre...@gmail.com wrote: For RDF, may GraphX be particularly approriated? aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi guys, I'm analyzing the possibility to use Spark to analyze RDF files and define reusable Shark operators on them (custom filtering, transforming, aggregating, etc). Is that possible? Any hint? Best, Flavio
Re: Running Spark alongside Hadoop
for development/testing i think its fine to run them side by side as you suggested, using spark standalone. just be realistic about what size data you can load with limited RAM. On Fri, Jun 20, 2014 at 3:43 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: The ideal way to do that is to use a cluster manager like Yarn mesos. You can control how much resources to give to which node etc. You should be able to run both together in standalone mode however you may experience varying latency performance in the cluster as both MR spark demand resources from same machines etc. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 3:41 PM, Sameer Tilak ssti...@live.com wrote: Dear Spark users, I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I would like to run Spark (in standalone mode) along side Hadoop on the same nodes. Given the configuration of my nodes, will that work? Does anyone has any experience in terms of stability and performance of running Spark and Hadoop on somewhat resource-constrained nodes. I was looking at the Spark documentation and there is a way to configure memory and cores for the and worker nodes and memory for the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to share resource between HAdoop and Spark?
Re: Possible approaches for adding extra metadata (Spark Streaming)?
If the metadata is directly related to each individual records, then it can be done either ways. Since I am not sure how easy or hard will it be for you add tags before putting the data into spark streaming, its hard to recommend one method over the other. However, if the metadata is related to each key (based on which you are called updateStateByKey) and not every record, then it may be more efficient to maintain that per-key metadata in the updateStateByKey's state object. Regarding doing http calls, I would be a bit cautious about performance. Doing a http call for every records it going to be quite expensive, and reduce throughput significantly. If it is possible, cache values as much as possible to amortize the cost of http calls. TD On Fri, Jun 20, 2014 at 11:16 AM, Shrikar archak shrika...@gmail.com wrote: Hi All, I was curious to know which of the two approach is better for doing analytics using spark streaming. Lets say we want to add some metadata to the stream which is being processed like sentiment, tags etc and then perform some analytics using these added metadata. 1) Is it ok to make a http call and add some extra information to the stream being processed in the updateByKeyAndWindow operations. 2) Add these sentiment/tags before and then stream through DStreams. Thanks, Shrikar
Set the number/memory of workers under mesos
Hi, just wondering anybody knows how to set up the number of workers (and the amount of memory) in mesos, while lauching spark-shell? I was trying to edit conf/spark-env.sh and it looks like that the environment variables are for YARN of standalone. Thanks!
Re: Set the number/memory of workers under mesos
You should be able to configure in spark context in Spark shell. spark.cores.max memory. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 20, 2014 at 4:30 PM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, just wondering anybody knows how to set up the number of workers (and the amount of memory) in mesos, while lauching spark-shell? I was trying to edit conf/spark-env.sh and it looks like that the environment variables are for YARN of standalone. Thanks!
Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space
In short, ADD_JARS will add the jar to your driver classpath and also send it to the workers (similar to what you are doing when you do sc.addJars). ex: MASTER=master/url ADD_JARS=/path/to/myJob.jar ./bin/spark-shell You also have SPARK_CLASSPATH var but it does not distribute the code, it is only used to compute the driver classpath. BTW, you are not supposed to change the compute_classpath.script 2014-06-20 19:45 GMT+02:00 Shivani Rao raoshiv...@gmail.com: Hello Eugene, You are right about this. I did encounter the pergmgenspace in the spark shell. Can you tell me a little more about ADD_JARS. In order to ensure my spark_shell has all required jars, I added the jars to the $CLASSPATH in the compute_classpath.sh script. is there another way of doing it? Shivani On Fri, Jun 20, 2014 at 9:47 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: In my case it was due to a case class I was defining in the spark-shell and not being available on the workers. So packaging it in a jar and adding it with ADD_JARS solved the problem. Note that I don't exactly remember if it was an out of heap space exception or pergmen space. Make sure your jarsPath is correct. Usually to debug this kind of problems I am using the spark-shell (you can do the same in your job but its more time constuming to repackage, deploy, run, iterate). Try for example 1) read the lines (without any processing) and count them 2) apply processing and count 2014-06-20 17:15 GMT+02:00 Shivani Rao raoshiv...@gmail.com: Hello Abhi, I did try that and it did not work And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So how did you overcome this problem? Shivani On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit : Hello Andrew, i wish I could share the code, but for proprietary reasons I can't. But I can give some idea though of what i am trying to do. The job reads a file and for each line of that file and processors these lines. I am not doing anything intense in the processLogs function import argonaut._ import argonaut.Argonaut._ /* all of these case classes are created from json strings extracted from the line in the processLogs() function * */ case class struct1… case class struct2… case class value1(struct1, struct2) def processLogs(line:String): Option[(key1, value1)] {… } def run(sparkMaster, appName, executorMemory, jarsPath) { val sparkConf = new SparkConf() sparkConf.setMaster(sparkMaster) sparkConf.setAppName(appName) sparkConf.set(spark.executor.memory, executorMemory) sparkConf.setJars(jarsPath) // This includes all the jars relevant jars.. val sc = new SparkContext(sparkConf) val rawLogs = sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt) rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt) } If I switch to local mode, the code runs just fine, it fails with the error I pasted above. In the cluster mode, even writing back the file we just read fails (rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting) I still believe this is a classNotFound error in disguise Indeed you are right, this can be the reason. I had similar errors when defining case classes in the shell and trying to use them in the RDDs. Are you shading argonaut in the fat jar ? Thanks Shivani On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com wrote: Wait, so the file only has four lines and the job running out of heap space? Can you share the code you're running that does the processing? I'd guess that you're doing some intense processing on every line but just writing parsed case classes back to disk sounds very lightweight. I On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com wrote: I am trying to process a file that contains 4 log lines (not very long) and then write my parsed out case classes to a destination folder, and I get the following error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at
Re: Parallel LogisticRegression?
I looks like I was running into https://issues.apache.org/jira/browse/SPARK-2204 The issues went away when I changed to spark.mesos.coarse. Kyle On Fri, Jun 20, 2014 at 10:36 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I've tried to parallelize the separate regressions using allResponses.toParArray.map( x= do logistic regression against labels in x) But I start to see messages like 14/06/20 10:10:26 WARN scheduler.TaskSetManager: Lost TID 4193 (task 363.0:4) 14/06/20 10:10:27 WARN scheduler.TaskSetManager: Loss was due to fetch failure from null and finally 14/06/20 10:10:26 ERROR scheduler.TaskSetManager: Task 363.0:4 failed 4 times; aborting job Then 14/06/20 10:10:26 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed due to the error null; shutting down SparkContext 14/06/20 10:10:26 ERROR actor.OneForOneStrategy: java.lang.UnsupportedOperationException at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32) at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185) This doesn't happen when I don't use toParArray. I read that spark was thread safe, but I seem to be running into problems. Am I doing something wrong? Kyle On Thu, Jun 19, 2014 at 11:21 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working on a problem learning several different sets of responses against the same set of training features. Right now I've written the program to cycle through all of the different label sets, attached them to the training data and run LogisticRegressionWithSGD on each of them. ie foreach curResponseSet in allResponses: currentRDD : RDD[LabeledPoints] = curResponseSet joined with trainingData LogisticRegressionWithSGD.train(currentRDD) Each of the different training runs are independent. It seems like I should be parallelize them as well. Is there a better way to do this? Kyle
Re: options set in spark-env.sh is not reflecting on actual execution
Hi Meethu, Are you using Spark 1.0? If so, you should use spark-submit ( http://spark.apache.org/docs/latest/submitting-applications.html), which has --executor-memory. If you don't want to specify this every time you submit an application, you can also specify spark.executor.memory in $SPARK_HOME/conf/spark-defaults.conf ( http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties ). SPARK_WORKER_MEMORY is for the worker daemon, not your individual application. A worker can launch many executors, and the value of SPARK_WORKER_MEMORY is shared across all executors running on that worker. SPARK_EXECUTOR_MEMORY is deprecated and replaced by spark.executor.memory. This is the value you should set. SPARK_DAEMON_JAVA_OPTS should not be used for setting spark configs, but instead is intended for java options for worker and master instances (not for Spark applications). Similarly, you shouldn't be setting SPARK_MASTER_OPTS or SPARK_WORKER_OPTS to configure your application. The recommended way for setting spark.* configurations is to do it programmatically by creating a new SparkConf, set these configurations in the conf, and pass this conf to the SparkContext (see http://spark.apache.org/docs/latest/configuration.html#spark-properties). Andrew 2014-06-18 22:21 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in: Hi all, I have a doubt regarding the options in spark-env.sh. I set the following values in the file in master and 2 workers SPARK_WORKER_MEMORY=7g SPARK_EXECUTOR_MEMORY=6g SPARK_DAEMON_JAVA_OPTS+=- Dspark.akka.timeout=30 -Dspark.akka.frameSize=1 -Dspark.blockManagerHeartBeatMs=80 -Dspark.shuffle.spill=false But SPARK_EXECUTOR_MEMORY is showing 4g in web UI.Do I need to change it anywhere else to make it 4g and to reflect it in web UI. A warning is coming that blockManagerHeartBeatMs is exceeding 45 while executing a process even though I set it to 80. So I doubt whether it should be set as SPARK_MASTER_OPTS or SPARK_WORKER_OPTS.. Thanks Regards, Meethu M
kibana like frontend for spark
Folks, I want to analyse logs and I want to use spark for that. However, elasticsearch has a fancy frontend in Kibana. Kibana's docs indicate that it works with elasticsearch only. Is there a similar frontend that can work with spark? Mohit. P.S.: On MapR's spark FAQ I read a statement like Kibana can use any ODBC/JDBC backend and Shark has that interace
Fwd: Using Spark
Hi, Would like to add ourselves to the user list if possible please? Company: truedash url: truedash.io Automatic pulling of all your data in to Spark for enterprise visualisation, predictive analytics and data exploration at a low cost. Currently in development with a few clients. Thanks
Re: How do you run your spark app?
Hello Shrikar, Thanks for your email. I have been using the same workflow as you did. But my questions was related to creation of the sparkContext. My question was If I am specifying jars in the java -cp jar-paths, and adding to them to my build.sbt, do I need to additionally add them in my code while creating the sparkContext (sparkContext.setJars( ))?? Thanks, Shivani On Fri, Jun 20, 2014 at 11:03 AM, Shrikar archak shrika...@gmail.com wrote: Hi Shivani, I use sbt assembly to create a fat jar . https://github.com/sbt/sbt-assembly Example of the sbt file is below. import AssemblyKeys._ // put this at the top of the file assemblySettings mainClass in assembly := Some(FifaSparkStreaming) name := FifaSparkStreaming version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-twitter % 1.0.0).exclude(org.eclipse.jetty.orbit,javax.transaction) .exclude(org.eclipse.jetty.orbit,javax.servlet) .exclude(org.eclipse.jetty.orbit,javax.mail.glassfish) .exclude(org.eclipse.jetty.orbit,javax.activation) .exclude(com.esotericsoftware.minlog, minlog), (net.debasishg % redisclient_2.10 % 2.12).exclude(com.typesafe.akka,akka-actor_2.10)) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case application.conf = MergeStrategy.concat case unwanted.txt = MergeStrategy.discard case x = old(x) } } resolvers += Akka Repository at http://repo.akka.io/releases/; And I run as mentioned below. LOCALLY : 1) sbt 'run AP1z4IYraYm5fqWhITWArY53x Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014' If you want to submit on the cluster CLUSTER: 2) spark-submit --class FifaSparkStreaming --master spark://server-8-144:7077 --driver-memory 2048 --deploy-mode cluster FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014 Hope this helps. Thanks, Shrikar On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao raoshiv...@gmail.com wrote: Hello Michael, I have a quick question for you. Can you clarify the statement build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. Can you give an example. I am using sbt assembly as well to create a fat jar, and supplying the spark and hadoop locations in the class path. Inside the main() function where spark context is created, I use SparkContext.jarOfClass(this).toList add the fat jar to my spark context. However, I seem to be running into issues with this approach. I was wondering if you had any inputs Michael. Thanks, Shivani On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal sonalgoy...@gmail.com wrote: We use maven for building our code and then invoke spark-submit through the exec plugin, passing in our parameters. Works well for us. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler mich...@tumra.com wrote: P.S. Last but not least we use sbt-assembly to build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. These are automatically built from source by our Jenkins and stored in HDFS. Our Chronos/Marathon jobs fetch the latest release TAR.GZ direct from HDFS, unpack it and launch the appropriate script. Makes for a much cleaner development / testing / deployment to package everything required in one go instead of relying on cluster specific classpath additions or any add-jars functionality. On 19 June 2014 22:53, Michael Cutler mich...@tumra.com wrote: When you start seriously using Spark in production there are basically two things everyone eventually needs: 1. Scheduled Jobs - recurring hourly/daily/weekly jobs. 2. Always-On Jobs - that require monitoring, restarting etc. There are lots of ways to implement these requirements, everything from crontab through to workflow managers like Oozie. We opted for the following stack: - Apache Mesos http://mesosphere.io/ (mesosphere.io distribution) - Marathon https://github.com/mesosphere/marathon - init/control system for starting, stopping, and maintaining always-on applications. - Chronos
Re: Worker dies while submitting a job
That error typically means that there is a communication error (wrong ports) between master and worker. Also check if the worker has write permissions to create the work directory. We were getting this error due one of the above two reasons On Tue, Jun 17, 2014 at 10:04 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I have been able to submit a job successfully but I had to config my spark job this way: val sparkConf: SparkConf = new SparkConf() .setAppName(TwitterPopularTags) .setMaster(spark://int-spark-master:7077) .setSparkHome(/opt/spark) .setJars(Seq(/tmp/spark-test-0.1-SNAPSHOT.jar)) Now I'm getting this error on my worker: 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 2014-06-17 17:36 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: Ok... I was checking the wrong version of that file yesterday. My worker is sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no case branch for that state and the worker is crashing. I still don't know why I'm getting a FAILED state but I'm sure that should kill the actor due to a scala.MatchError. Usually in scala is a best-practice to use a sealed trait and case classes/objects in a match statement instead of an enumeration (the compiler will complain about missing cases); I think that should be refactored to catch this kind of errors at compile time. Now I need to find why that state changed message is sent... I will continue updating this thread until I found the problem :D 2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: I'm playing with a modified version of the TwitterPopularTags example and when I tried to submit the job to my cluster, workers keep dying with this message: 14/06/16 17:11:16 INFO DriverRunner: Launch Command: java -cp /opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.deploy.worker.DriverWrapper akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker org.apache.spark.examples.streaming.TwitterPopularTags 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class scala.Enumeration$Val) scala.MatchError: FAILED (of class scala.Enumeration$Val) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/06/16 17:11:17 INFO Worker: Starting Spark worker int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at http://int-spark-app-ie005d6a3.mclabs.io:8081 14/06/16 17:11:17 INFO Worker: Connecting to master spark://int-spark-app-ie005d6a3.mclabs.io:7077... 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to re-register worker at same address: akka.tcp:// sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676 This happens when the worker receive a DriverStateChanged(driverId, state, exception) message. To deploy the job I copied the jar file to the temporary folder of master node and execute the following command: ./spark-submit \ --class org.apache.spark.examples.streaming.TwitterPopularTags \ --master spark://int-spark-master:7077 \ --deploy-mode cluster \ file:///tmp/spark-test-0.1-SNAPSHOT.jar I don't really know what the problem could be as there is a 'case _' that should avoid that problem :S -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Re: How do you run your spark app?
Hi Shivani, Adding JARs to classpath (e.g. via -cp option) is needed to run your _local_ Java application, whatever it is. To deliver them to _other machines_ for execution you need to add them to SparkContext. And you can do it in 2 different ways: 1. Add them right from your code (your suggested sparkContext.setJars(...)). 2. Use spark-submit and pass JARs from command line. Note, that both options are easier to do if you assemble your code and all its dependencies into a single fat JAR instead of manually listing all needed libraries. On Sat, Jun 21, 2014 at 1:47 AM, Shivani Rao raoshiv...@gmail.com wrote: Hello Shrikar, Thanks for your email. I have been using the same workflow as you did. But my questions was related to creation of the sparkContext. My question was If I am specifying jars in the java -cp jar-paths, and adding to them to my build.sbt, do I need to additionally add them in my code while creating the sparkContext (sparkContext.setJars( ))?? Thanks, Shivani On Fri, Jun 20, 2014 at 11:03 AM, Shrikar archak shrika...@gmail.com wrote: Hi Shivani, I use sbt assembly to create a fat jar . https://github.com/sbt/sbt-assembly Example of the sbt file is below. import AssemblyKeys._ // put this at the top of the file assemblySettings mainClass in assembly := Some(FifaSparkStreaming) name := FifaSparkStreaming version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-twitter % 1.0.0).exclude(org.eclipse.jetty.orbit,javax.transaction) .exclude(org.eclipse.jetty.orbit,javax.servlet) .exclude(org.eclipse.jetty.orbit,javax.mail.glassfish) .exclude(org.eclipse.jetty.orbit,javax.activation) .exclude(com.esotericsoftware.minlog, minlog), (net.debasishg % redisclient_2.10 % 2.12).exclude(com.typesafe.akka,akka-actor_2.10)) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case application.conf = MergeStrategy.concat case unwanted.txt = MergeStrategy.discard case x = old(x) } } resolvers += Akka Repository at http://repo.akka.io/releases/; And I run as mentioned below. LOCALLY : 1) sbt 'run AP1z4IYraYm5fqWhITWArY53x Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014' If you want to submit on the cluster CLUSTER: 2) spark-submit --class FifaSparkStreaming --master spark://server-8-144:7077 --driver-memory 2048 --deploy-mode cluster FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014 Hope this helps. Thanks, Shrikar On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao raoshiv...@gmail.com wrote: Hello Michael, I have a quick question for you. Can you clarify the statement build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. Can you give an example. I am using sbt assembly as well to create a fat jar, and supplying the spark and hadoop locations in the class path. Inside the main() function where spark context is created, I use SparkContext.jarOfClass(this).toList add the fat jar to my spark context. However, I seem to be running into issues with this approach. I was wondering if you had any inputs Michael. Thanks, Shivani On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal sonalgoy...@gmail.com wrote: We use maven for building our code and then invoke spark-submit through the exec plugin, passing in our parameters. Works well for us. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler mich...@tumra.com wrote: P.S. Last but not least we use sbt-assembly to build fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and everything needed to run a Job. These are automatically built from source by our Jenkins and stored in HDFS. Our Chronos/Marathon jobs fetch the latest release TAR.GZ direct from HDFS, unpack it and launch the appropriate script. Makes for a much cleaner development / testing / deployment to package everything required in one go instead of relying on cluster specific classpath additions or any add-jars functionality. On 19 June 2014 22:53, Michael Cutler mich...@tumra.com wrote: When you start
sc.textFile can't recognize '\004'
Hi, I need to parse a file which is separated by a series of separators. I used SparkContext.textFile and I met two problems: 1) One of the separators is '\004', which could be recognized by python or R or Hive, however Spark seems can't recognize this one and returns a symbol looking like '?'. Also this symbol is not a question mark and I don't know how to parse. 2) Some of the separator are composed of several Chars, like } =. If I use str.split(Array('}', '=')), it will separate the string but with many white spaces included in the middle. Is there a good way that I could separate by String instead of by Array of Chars? Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sc-textFile-can-t-recognize-004-tp8059.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running Spark alongside Hadoop
I only ran HDFS on the same nodes as Spark and that worked out great performance and robustness wise. However, I did not run Hadoop itself to do any computations/jobs on the same nodes. My expectation is that if you actually ran both at the same time with your configuration, the performance would be pretty bad. It's mostly about memory really and then CPU(s) etc. OD On 6/20/14, 2:41 PM, Sameer Tilak wrote: Dear Spark users, I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I would like to run Spark (in standalone mode) along side Hadoop on the same nodes. Given the configuration of my nodes, will that work? Does anyone has any experience in terms of stability and performance of running Spark and Hadoop on somewhat resource-constrained nodes. I was looking at the Spark documentation and there is a way to configure memory and cores for the and worker nodes and memory for the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to share resource between HAdoop and Spark?