Re: How to stop a running SparkContext in the proper way?
ctrl + z will stop the job from being executed ( If you do a *fg/bg *you can resume the job). You need to press ctrl + c to terminate the job! Thanks Best Regards On Wed, Jun 4, 2014 at 10:24 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I want to know how I can stop a running SparkContext in a proper way so that next time when I start a new SparkContext, the web UI can be launched on the same port 4040.Now when i quit the job using ctrl+z the new sc are launched in new ports. I have the same problem with ipython notebook.It is launched on a different port when I start the notebook second time after closing the first one.I am starting ipython using the command IPYTHON_OPTS=notebook --ip --pylab inline ./bin/pyspark Thanks Regards, Meethu M
Re: ZeroMQ Stream - stack guard problem and no data
Hi, What is your Zeromq version ? It is known to work well with 2.2 an output of `sudo ldconfig -v | grep zmq` would helpful in this regard. Thanks Prashant Sharma On Wed, Jun 4, 2014 at 11:40 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I am trying to use Spark Streaming (1.0.0) with ZeroMQ, i.e. I say def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator val lines: DStream[String] = ZeroMQUtils.createStream(ssc, tcp://localhost:5556, Subscribe(mytopic), bytesToStringIterator _) lines.print() but when I start this program (in local mode), I get OpenJDK 64-Bit Server VM warning: You have loaded library /tmp/jna2713405829859698528.tmp which might have disabled stack guard. The VM will try to fix the stack guard now. It's highly recommended that you fix the library with 'execstack -c libfile', or link it with '-z noexecstack'. and no data is received. The ZeroMQ setup should be ok, though; the Python code context = zmq.Context() socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, mytopic) socket.connect(tcp://localhost:5556) while True: msg = socket.recv() print msg time.sleep(1) works fine and prints the messages issued by the publisher. Any suggestions on what is going wrong here? Thanks Tobias
SocketException when reading from S3 (s3n format)
Hi all, I've set up a 4-node spark cluster (the nodes are r3.large) with the spark-ec2 script. I've been trying to run a job on this cluster, and I'm trying to figure out why I get the following exception: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) at sun.security.ssl.InputRecord.read(InputRecord.java:509) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76) at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136) at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Upon inspection, the error seems to be while reading from a s3n address. The data itself is not big (around 35 megabytes) but I am partitioning it into 8 groups. Is there a way to make these kinds of reads more reliable? If not, is there a way to increase the maximum number of errors tolerated in a job before it is killed? Thanks! Dan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SocketException-when-reading-from-S3-s3n-format-tp6889.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SocketException when reading from S3 (s3n format)
I should add that I'm using spark 0.9.1. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SocketException-when-reading-from-S3-s3n-format-tp6889p6890.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: mounting SSD devices of EC2 r3.8xlarge instances
For SSDs in r3, maybe it's better to mount with `discard` option since it supports TRIM: What I did for r3.large: echo '/dev/xvdb /mnt ext4 defaults,noatime,nodiratime,discard 0 0' /etc/fstab mkfs.ext4 /dev/xvdb mount /dev/xvdb 2014-06-03 19:15 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: Those instance types are not yet supported by the scripts, but https://issues.apache.org/jira/browse/SPARK-1790 is tracking this issue and it will soon be fixed in both branch-0.9 and 1.0. The problem is that those drives are not formatted on r3 machines, whereas they are on the other instance types, so the script assumed that they’d be formatted. You can manually go and do this to set them up: mkfs.ext4 /dev/sdb mkfs.ext4 /dev/sdc mount -o noatime /dev/sdb /mnt mount -o noatime /dev/sdc /mnt2 Matei On Jun 3, 2014, at 10:05 AM, Andras Barjak andras.bar...@lynxanalytics.com wrote: Hi, I have noticed that upon launching a cluster consisting of r3.8xlarge high-memory instances the standard /mnt /mnt2 /mnt3 /mnt4 temporary directories get created and set up for temp usage, however they will point to the root 8Gb filesystem. The 2x320GB SSD-s are not mounted and also they are not even formatted. This problem might affect other EC2 instances as well, I suppose. I am using 0.9.1, is this something that has been corrected in the 1.0.0 spark-ec2 script? regards, András Barják -- *JU Han* Data Engineer @ Botify.com +33 061960
IllegalArgumentException on calling KMeans.train()
what does this exception mean? 14/06/04 16:35:15 ERROR executor.Executor: Exception in task ID 6 java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366) at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) my spark version: 1.0.0 Java: 1.7 my codes: JavaRDDVector docVectors = generateDocVector(...); int numClusters = 20; int numIterations = 20; KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters, numIterations); another strange thing is that the mapPartitionsWithIndex() method call in generateDocVector() are invoked for 3 times... 2014-06-04 bluejoe2008
Problem understanding log message in SparkStreaming
I wanted to know the meaning of the following log message when running a spark streaming job : [spark-akka.actor.default-dispatcher-18] INFO org.apache.spark.streaming.scheduler.JobScheduler - Total delay: 5.432 s for time 1401870454500 ms (execution: 0.593 s) According to my understanding, total delay here means total end-to-end delay which is here 5.432 sec. What is the meaning of execution : 0.593 ?? Is it the time taken for executing this particular query ? PS : I am running a streaming job over a window of 5 mins and quering every 1.5 sec. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-understanding-log-message-in-SparkStreaming-tp6893.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to change default storage levels
Hi I'm using Spark 0.9.1 and Shark 0.9.1. My dataset does not fit into memory I have in my cluster setup, so I want to use also disk for caching. I guess MEMORY_ONLY is the default storage level in Spark. If that's the case how could I change the storage level to MEMORY_AND_DISK in Spark? thanks Salih
executor idle during task schedule
Hi, all i've observed that sometimes when the executor finishes one task, it will wait about 5 seconds to get another task to work, during the 5 seconds, the executor does nothing: cpu idle, no disk access, no network transfer. is that normal for spark? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-idle-during-task-schedule-tp6895.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
compile spark 1.0.0 error
hi,maillist: i try to compile spark ,but failed, here is my compile command and compile output # SPARK_HADOOP_VERSION=2.0.0-cdh4.4.0 SPARK_YARN=true sbt/sbt assembly [warn] 18 warnings found [info] Compiling 53 Scala sources and 1 Java source to /home/admserver/spark-1.0.0/sql/catalyst/target/scala-2.10/classes... [info] Compiling 68 Scala sources and 2 Java sources to /home/admserver/spark-1.0.0/streaming/target/scala-2.10/classes... [info] Compiling 62 Scala sources and 1 Java source to /home/admserver/spark-1.0.0/mllib/target/scala-2.10/classes... [info] Compiling 14 Scala sources to /home/admserver/spark-1.0.0/yarn/alpha/target/scala-2.10/classes... [error] /home/admserver/spark-1.0.0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala:36: object AMResponse is not a member of package org.apache.hadoop.yarn.api.records [error] import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId} [error]^ [error] /home/admserver/spark-1.0.0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala:110: value getAMResponse is not a member of org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse [error] val amResp = allocateExecutorResources(executorsToRequest).getAMResponse [error]^ [error] two errors found [error] (yarn-alpha/compile:compile) Compilation failed [error] Total time: 1815 s, completed Jun 4, 2014 5:07:56 PM
Re: IllegalArgumentException on calling KMeans.train()
Could you check whether the vectors have the same size? -Xiangrui On Wed, Jun 4, 2014 at 1:43 AM, bluejoe2008 bluejoe2...@gmail.com wrote: what does this exception mean? 14/06/04 16:35:15 ERROR executor.Executor: Exception in task ID 6 java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366) at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) my spark version: 1.0.0 Java: 1.7 my codes: JavaRDDVector docVectors = generateDocVector(...); int numClusters = 20; int numIterations = 20; KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters, numIterations); another strange thing is that the mapPartitionsWithIndex() method call in generateDocVector() are invoked for 3 times... 2014-06-04 bluejoe2008
Re: ZeroMQ Stream - stack guard problem and no data
It's complaining about the native library shipped with ZeroMQ, right? That message is the JVM complaining about how it was compiled. If so, I think it's a question for ZeroMQ? On Wed, Jun 4, 2014 at 7:10 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I am trying to use Spark Streaming (1.0.0) with ZeroMQ, i.e. I say def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator val lines: DStream[String] = ZeroMQUtils.createStream(ssc, tcp://localhost:5556, Subscribe(mytopic), bytesToStringIterator _) lines.print() but when I start this program (in local mode), I get OpenJDK 64-Bit Server VM warning: You have loaded library /tmp/jna2713405829859698528.tmp which might have disabled stack guard. The VM will try to fix the stack guard now. It's highly recommended that you fix the library with 'execstack -c libfile', or link it with '-z noexecstack'. and no data is received. The ZeroMQ setup should be ok, though; the Python code context = zmq.Context() socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, mytopic) socket.connect(tcp://localhost:5556) while True: msg = socket.recv() print msg time.sleep(1) works fine and prints the messages issued by the publisher. Any suggestions on what is going wrong here? Thanks Tobias
Re: RDD with a Map
Just a thought... Are you trying to use use the RDD as a Map? On 3 June 2014 23:14, Doris Xin doris.s@gmail.com wrote: Hey Amit, You might want to check out PairRDDFunctions http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions. For your use case in particular, you can load the file as a RDD[(String, String)] and then use the groupByKey() function in PairRDDFunctions to get an RDD[(String, Iterable[String])]. Doris On Tue, Jun 3, 2014 at 2:56 PM, Amit Kumar kumarami...@gmail.com wrote: Hi Folks, I am new to spark -and this is probably a basic question. I have a file on the hdfs 1, one 1, uno 2, two 2, dos I want to create a multi Map RDD RDD[Map[String,List[String]]] {1-[one,uno], 2-[two,dos]} First I read the file val identityData:RDD[String] = sc.textFile($path_to_the_file, 2).cache() val identityDataList:RDD[List[String]]= identityData.map{ line = val splits= line.split(,) splits.toList } Then I group them by the first element val grouped:RDD[(String,Iterable[List[String]])]= songArtistDataList.groupBy{ element ={ element(0) } } Then I do the equivalent of mapValues of scala collections to get rid of the first element val groupedWithValues:RDD[(String,List[String])] = grouped.flatMap[(String,List[String])]{ case (key,list)={ List((key,list.map{element = { element(1) }}.toList)) } } for this to actually materialize I do collect val groupedAndCollected=groupedWithValues.collect() I get an Array[String,List[String]]. I am trying to figure out if there is a way for me to get Map[String,List[String]] (a multimap), or to create an RDD[Map[String,List[String]] ] I am sure there is something simpler, I would appreciate advice. Many thanks, Amit -- Kind regards, Oleg
Re: Spark not working with mesos
Thanks for the reply Akhil. I created a tar.gz of created by make-distribution.sh which is accessible from all the slaves (I checked it using hadoop fs -ls /path/). Also there are no worker logs printed in $SPARK_HOME/work/ directory on the workers (which are otherwise printed if i run without using mesos). -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-not-working-with-mesos-tp6806p6900.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error related to serialisation in spark streaming
I think Mayur meant that Spark doesn't necessarily clean the closure under Java 7 -- is that true though? I didn't know of an issue there. Some anonymous class in your (?) OptimisingSort class is getting serialized, which may be fine and intentional, but it is not serializable. You haven't posted that class, but look to things like anonymous or inner classes that aren't marked Serializable. On Wed, Jun 4, 2014 at 12:25 AM, Andrew Ash and...@andrewash.com wrote: Hi Mayur, is that closure cleaning a JVM issue or a Spark issue? I'm used to thinking of closure cleaner as something Spark built. Do you have somewhere I can read more about this? On Tue, Jun 3, 2014 at 12:47 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: So are you using Java 7 or 8. 7 doesnt clean closures properly. So you need to define a static class as a function then call that in your operations. Else it'll try to send the whole class along with the function. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Tue, Jun 3, 2014 at 7:19 PM, Sean Owen so...@cloudera.com wrote: Sorry if I'm dense but is OptimisingSort your class? it's saying you have included something from it in a function that is shipped off to remote workers but something in it is not java.io.Serializable. OptimisingSort$6$1 needs to be Serializable. On Tue, Jun 3, 2014 at 2:23 PM, nilmish nilmish@gmail.com wrote: I am using the following code segment : countPerWindow.foreachRDD(new FunctionJavaPairRDDlt;String, Long, Void() { @Override public Void call(JavaPairRDDString, Long rdd) throws Exception { ComparatorTuple2lt;String,Long comp = new ComparatorTuple2lt;String,Long () { public int compare(Tuple2String,Long tupleA, Tuple2String,Long tupleB) { return 1-tupleA._2.compareTo(tupleB._2); } }; Listscala.Tuple2lt;String,Long top = rdd.top(5,comp); // creating error System.out.println(Top 5 are : ); for(int i=0;itop.size();++i) { System.out.println(top.get(i)._2 + + top.get(i)._1); } return null; } }); } I am getting the following error related to serialisation : org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException Detailed Error : INFO org.apache.spark.scheduler.DAGScheduler - Failed to run top at OptimisingSort.java:173 2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1401801057000 ms.2 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: OptimisingSort$6$1 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) How can I remove this error ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: compile spark 1.0.0 error
I am not sure if it is exposed in the SBT build, but you may need the equivalent of the 'yarn-alpha' profile from the Maven build. This older build of CDH predates the newer YARN APIs. See also https://groups.google.com/forum/#!msg/spark-users/T1soH67C5M4/CmGYV8kfRkcJ Or, use a later CDH. In fact 4.6+ has a Spark parcel for you already. On Wed, Jun 4, 2014 at 10:13 AM, ch huang justlo...@gmail.com wrote: hi,maillist: i try to compile spark ,but failed, here is my compile command and compile output # SPARK_HADOOP_VERSION=2.0.0-cdh4.4.0 SPARK_YARN=true sbt/sbt assembly [warn] 18 warnings found [info] Compiling 53 Scala sources and 1 Java source to /home/admserver/spark-1.0.0/sql/catalyst/target/scala-2.10/classes... [info] Compiling 68 Scala sources and 2 Java sources to /home/admserver/spark-1.0.0/streaming/target/scala-2.10/classes... [info] Compiling 62 Scala sources and 1 Java source to /home/admserver/spark-1.0.0/mllib/target/scala-2.10/classes... [info] Compiling 14 Scala sources to /home/admserver/spark-1.0.0/yarn/alpha/target/scala-2.10/classes... [error] /home/admserver/spark-1.0.0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala:36: object AMResponse is not a member of package org.apache.hadoop.yarn.api.records [error] import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId} [error]^ [error] /home/admserver/spark-1.0.0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala:110: value getAMResponse is not a member of org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse [error] val amResp = allocateExecutorResources(executorsToRequest).getAMResponse [error]^ [error] two errors found [error] (yarn-alpha/compile:compile) Compilation failed [error] Total time: 1815 s, completed Jun 4, 2014 5:07:56 PM
Re: Spark not working with mesos
http://spark.apache.org/docs/latest/running-on-mesos.html#troubleshooting-and-debugging If you are not able to find the logs in /var/log/mesos Do check in /tmp/mesos/ and you can see your applications id and all just like in the $SPARK_HOME/work directory. Thanks Best Regards On Wed, Jun 4, 2014 at 3:18 PM, praveshjain1991 praveshjain1...@gmail.com wrote: Thanks for the reply Akhil. I created a tar.gz of created by make-distribution.sh which is accessible from all the slaves (I checked it using hadoop fs -ls /path/). Also there are no worker logs printed in $SPARK_HOME/work/ directory on the workers (which are otherwise printed if i run without using mesos). -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-not-working-with-mesos-tp6806p6900.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error related to serialisation in spark streaming
The error is resolved. I was using a comparator which was not serialised because of which it was throwing the error. I have now switched to kryo serializer as it is faster than java serialser. I have set the required config conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyRegistrator); and also in MyRegistrator class I have registered all the classes I am serialising. How can I confirm that my code is actually using kryo serialiser and not java serialiser now ? PS : It seems like my code is still not using kryo serialiser. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Can this be done in map-reduce technique (in parallel)
Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error related to serialisation in spark streaming
I had issues around embedded functions here's what I have figured. Every inner class actually contains a field referencing the outer class. The anonymous class actually has a this$0 field referencing the outer class, and thus why Spark is trying to serialize Outer class. In the Scala API, the closure (which is really just implemented as anonymous classes) has a field called $outer, and Spark uses a closure cleaner that goes into the anonymous class to remove the $outer field if it is not used in the closure itself. In Java, the compiler generates a field called this$0, and thus the closure cleaner doesn't find it and can't clean it properly. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 4, 2014 at 4:18 PM, nilmish nilmish@gmail.com wrote: The error is resolved. I was using a comparator which was not serialised because of which it was throwing the error. I have now switched to kryo serializer as it is faster than java serialser. I have set the required config conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyRegistrator); and also in MyRegistrator class I have registered all the classes I am serialising. How can I confirm that my code is actually using kryo serialiser and not java serialiser now ? PS : It seems like my code is still not using kryo serialiser. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error related to serialisation in spark streaming
static inner classes do not refer to the outer class. Often people declare them non-static by default when it's unnecessary -- a Comparator class is typically a great example. Anonymous inner classes declared inside a method are another example, but there again they can be refactored into named static inner classes. I don't think the closure cleaner should remove this reference, unless it somehow knows it isn't used. It could be used, that is. If it's not used, it should just not exist in the byte code to begin with. On Wed, Jun 4, 2014 at 12:29 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: I had issues around embedded functions here's what I have figured. Every inner class actually contains a field referencing the outer class. The anonymous class actually has a this$0 field referencing the outer class, and thus why Spark is trying to serialize Outer class. In the Scala API, the closure (which is really just implemented as anonymous classes) has a field called $outer, and Spark uses a closure cleaner that goes into the anonymous class to remove the $outer field if it is not used in the closure itself. In Java, the compiler generates a field called this$0, and thus the closure cleaner doesn't find it and can't clean it properly. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Wed, Jun 4, 2014 at 4:18 PM, nilmish nilmish@gmail.com wrote: The error is resolved. I was using a comparator which was not serialised because of which it was throwing the error. I have now switched to kryo serializer as it is faster than java serialser. I have set the required config conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyRegistrator); and also in MyRegistrator class I have registered all the classes I am serialising. How can I confirm that my code is actually using kryo serialiser and not java serialiser now ? PS : It seems like my code is still not using kryo serialiser. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can this be done in map-reduce technique (in parallel)
It is possible if you use a cartesian product to produce all possible pairs for each IP address and 2 stages of map-reduce: - first by pairs of points to find the total of each pair and - second by IP address to find the pair for each IP address with the maximum count. Oleg On 4 June 2014 11:49, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Join : Giving incorrect result
Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Re: Facing MetricsSystem error on Running Spark applications
You've got a conflict in the version of Jackson that is being used: Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.module.SimpleSerializers.init(Ljava/util/List;)V Looks like you are using Jackson 2.x somewhere, but AFAIK all of the Hadoop/Spark libs are still on 1.x. That's roughly the proximate problem, but how to resolve it will depend a bit more on what your app is doing. On Wed, Jun 4, 2014 at 1:31 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi, I am facing following error on running spark applications. What could be missing which is causing this issue. org.eclipse.jetty.server.AbstractConnector - Started SocketConnector@0.0.0.0:55046 3574 [main] ERROR org.apache.spark.metrics.MetricsSystem - Sink class org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:134) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:129) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:129) at org.apache.spark.metrics.MetricsSystem.init(MetricsSystem.scala:83) at org.apache.spark.metrics.MetricsSystem$.createMetricsSystem(MetricsSystem.scala:163) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:198) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.SparkContext.init(SparkContext.scala:100) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:81) at com.flipkart.sniper.common.ep.SparkResult.processTable(SparkResult.java:50) at com.flipkart.sniper.common.ep.LocalEventProcessorRunner.processTable(LocalEventProcessorRunner.java:95) at com.flipkart.sniper.common.ep.LocalEventProcessorRunner.call(LocalEventProcessorRunner.java:73) at com.flipkart.sniper.common.job.JobRunner.runJob(JobRunner.java:64) at com.flipkart.sniper.common.job.JobRunner.main(JobRunner.java:166) Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.module.SimpleSerializers.init(Ljava/util/List;)V at com.codahale.metrics.json.MetricsModule.setupModule(MetricsModule.java:213) at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:469) at org.apache.spark.metrics.sink.MetricsServlet.init(MetricsServlet.scala:44) ... 23 more Thanks, -Vibhor
Re: Can't seem to link external/twitter classes from my own app
Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ Which I'm guessing is because I haven't put in a dependency to external/twitter in the .sbt, but _how_? I can't find any docs on it. Here's my build file so far: simple.sbt -- name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.0.0 libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3 resolvers += Akka Repository at http://repo.akka.io/releases/; -- I've tried a few obvious things like adding: libraryDependencies += org.apache.spark %% spark-external % 1.0.0 libraryDependencies += org.apache.spark %% spark-external-twitter % 1.0.0 because, well, that would match the naming scheme implied so far, but it errors. Also, I just realized I don't completely understand if: (a) the spark-submit command _sends_ the .jar to all the workers, or (b) the spark-submit commands sends a _job_ to the workers, which are supposed to already have the jar file installed (or in hdfs), or (c) the Context is supposed to list the jars to be distributed. (is that deprecated?) One part of the documentation says: Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar. but another says: application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. I suppose both could be correct if you take a certain point of view. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Re: IllegalArgumentException on calling KMeans.train()
thank you! 孟祥瑞 with your help i solved the problem. I constructed SparseVectors in a wrong way the first parameter of the constructor SparseVector(int size, int[] indices, double[] values) I mistaked it for the size of values 2014-06-04 bluejoe2008 From: Xiangrui Meng Date: 2014-06-04 17:35 To: user Subject: Re: IllegalArgumentException on calling KMeans.train() Could you check whether the vectors have the same size? -Xiangrui On Wed, Jun 4, 2014 at 1:43 AM, bluejoe2008 bluejoe2...@gmail.com wrote: what does this exception mean? 14/06/04 16:35:15 ERROR executor.Executor: Exception in task ID 6 java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366) at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) my spark version: 1.0.0 Java: 1.7 my codes: JavaRDDVector docVectors = generateDocVector(...); int numClusters = 20; int numIterations = 20; KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters, numIterations); another strange thing is that the mapPartitionsWithIndex() method call in generateDocVector() are invoked for 3 times... 2014-06-04 bluejoe2008
Re: Can't seem to link external/twitter classes from my own app
@Sean, the %% syntax in SBT should automatically add the Scala major version qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct syntax for the build. I seemed to run into this issue with some missing Jackson deps, and solved it by including the jar explicitly on the driver class path: bin/spark-submit *-* *-driver-class-path SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar* --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar Seems redundant to me since I thought that the JAR as argument is copied to driver and made available. But this solved it for me so perhaps give it a try? On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ Which I'm guessing is because I haven't put in a dependency to external/twitter in the .sbt, but _how_? I can't find any docs on it. Here's my build file so far: simple.sbt -- name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.0.0 libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3 resolvers += Akka Repository at http://repo.akka.io/releases/; -- I've tried a few obvious things like adding: libraryDependencies += org.apache.spark %% spark-external % 1.0.0 libraryDependencies += org.apache.spark %% spark-external-twitter % 1.0.0 because, well, that would match the naming scheme implied so far, but it errors. Also, I just realized I don't completely understand if: (a) the spark-submit command _sends_ the .jar to all the workers, or (b) the spark-submit commands sends a _job_ to the workers, which are supposed to already have the jar file installed (or in hdfs), or (c) the Context is supposed to list the jars to be distributed. (is that deprecated?) One part of the documentation says: Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar. but another says: application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. I suppose both could be correct if you take a certain point of view. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Can't seem to link external/twitter classes from my own app
Ah sorry, this may be the thing I learned for the day. The issue is that classes from that particular artifact are missing though. Worth interrogating the resulting .jar file with jar tf to see if it made it in? On Wed, Jun 4, 2014 at 2:12 PM, Nick Pentreath nick.pentre...@gmail.com wrote: @Sean, the %% syntax in SBT should automatically add the Scala major version qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct syntax for the build. I seemed to run into this issue with some missing Jackson deps, and solved it by including the jar explicitly on the driver class path: bin/spark-submit --driver-class-path SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar Seems redundant to me since I thought that the JAR as argument is copied to driver and made available. But this solved it for me so perhaps give it a try? On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ Which I'm guessing is because I haven't put in a dependency to external/twitter in the .sbt, but _how_? I can't find any docs on it. Here's my build file so far: simple.sbt -- name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.0.0 libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3 resolvers += Akka Repository at http://repo.akka.io/releases/; -- I've tried a few obvious things like adding: libraryDependencies += org.apache.spark %% spark-external % 1.0.0 libraryDependencies += org.apache.spark %% spark-external-twitter % 1.0.0 because, well, that would match the naming scheme implied so far, but it errors. Also, I just realized I don't completely understand if: (a) the spark-submit command _sends_ the .jar to all the workers, or (b) the spark-submit commands sends a _job_ to the workers, which are supposed to already have the jar file installed (or in hdfs), or (c) the Context is supposed to list the jars to be distributed. (is that deprecated?) One part of the documentation says: Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar. but another says: application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. I suppose both could be correct if you take a certain point of view. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
is there any easier way to define a custom RDD in Java
hi, folks, is there any easier way to define a custom RDD in Java? I am wondering if I have to define a new java class which extends RDD from scratch? It is really a hard job for developers! 2014-06-04 bluejoe2008
Re: Yay for 1.0.0! EC2 Still has problems.
On Wed, Jun 4, 2014 at 12:31 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, sorry to hear you had more problems. Some thoughts on them: There will always be more problems, 'tis the nature of coding. :-) I try not to bother the list until I've smacked my head against them for a few hours, so it's only the most confusing stuff I pour out here. I'm actually progressing pretty well. (the streaming.Twitter ones especially) depend on there being a /mnt/spark and /mnt2/spark directory (I think for java tempfiles?) and those don't seem to exist out-of-the-box. I think this is a side-effect of the r3 instances not having those drives mounted. Our setup script would normally create these directories. What was the error? Oh, I went back to m1.large while those issues get sorted out. I decided I had enough problems without messing with that too. (seriously, why does Amazon do these things? It's like they _try_ to make the instances incompatible.) I forget the exact error, but it traced through createTempFile and it was fairly clear about the directory being missing. Things like bin/run-example SparkPi worked fine, but I'll bet twitter4j creates temp files, so bin/run-example streaming.TwitterPopularTags broke. What did you change log4j.properties to? It should be changed to say log4j.rootCategory=WARN, console but maybe another log4j.properties is somehow arriving on the classpath. This is definitely a common problem so we need to add some explicit docs on it. I seem to have this sorted out, don't ask me how. Once again I was probably editing things on the cluster master when I should have been editing the cluster controller, or vice versa. But, yeah, many of the examples just get lost in a sea of DAG INFO messages. Are you going through http://spark.apache.org/docs/latest/quick-start.html? You should be able to do just sbt package. Once you do that you don’t need to deploy your application’s JAR to the cluster, just pass it to spark-submit and it will automatically be sent over. Ah, that answers another question I just asked elsewhere... Yup, I re-read pretty much every documentation page daily. And I'm making my way through every video. Meanwhile I'm learning scala... Great Turing's Ghost, it's the dream language we've theorized about for years! I hadn't realized! Indeed, glad you’re enjoying it. Enjoying, not yet alas, I'm sure I'll get there. But I do understand the implications of a mixed functional-imperative language with closures and lambdas. That is serious voodoo. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: spark on yarn fail with IOException
I get a very similar stack trace and have no idea what could be causing it (see below). I've created a SO: http://stackoverflow.com/questions/24038908/spark-fails-on-big-jobs-with-java-io-ioexception-filesystem-closed 14/06/02 20:44:04 INFO client.AppClient$ClientActor: Executor updated: app-20140602203435-0020/6 is now FAILED (Command exited with code 137) 14/06/02 20:44:05 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140602203435-0020/6 removed: Command exited with code 137 14/06/02 20:44:05 INFO cluster.SparkDeploySchedulerBackend: Executor 6 disconnected, so removing it 14/06/02 20:44:05 ERROR scheduler.TaskSchedulerImpl: Lost executor 6 on ip-172-31-23-17.ec2.internal: Unknown executor exit code (137) (died from signal 9?) 14/06/02 20:44:05 INFO scheduler.TaskSetManager: Re-queueing tasks for 6 from TaskSet 2.0 14/06/02 20:44:05 WARN scheduler.TaskSetManager: Lost TID 358 (task 2.0:66) ... 14/06/02 21:08:11 INFO cluster.SparkDeploySchedulerBackend: Executor 16 disconnected, so removing it 14/06/02 21:08:11 ERROR scheduler.TaskSchedulerImpl: Lost executor 16 on ip-172-31-28-73.ec2.internal: remote Akka client disassociated 14/06/02 21:08:11 INFO scheduler.TaskSetManager: Re-queueing tasks for 16 from TaskSet 5.5 14/06/02 21:08:11 INFO scheduler.DAGScheduler: Executor lost: 16 (epoch 24) 14/06/02 21:08:11 INFO storage.BlockManagerMasterActor: Trying to remove executor 16 from BlockManagerMaster. 14/06/02 21:08:11 INFO storage.BlockManagerMaster: Removed 16 successfully in removeExecutor 14/06/02 21:08:11 INFO scheduler.Stage: Stage 5 is now unavailable on executor 16 (207/234, false) 14/06/02 21:08:11 INFO client.AppClient$ClientActor: Executor updated: app-20140602203435-0020/16 is now FAILED (Command exited with code 137) 14/06/02 21:08:11 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140602203435-0020/16 removed: Command exited with code 137 14/06/02 21:08:11 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client 14/06/02 21:08:11 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection... 14/06/02 21:08:12 INFO scheduler.TaskSchedulerImpl: Ignoring update with state FAILED from TID 1364 because its task set is gone ... 14/06/02 21:08:12 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at
Spark Usecase
Hello All. I have a newbie question. We have a use case where huge amount of data will be coming in streams or micro-batches of streams and we want to process these streams according to some business logic. We don't have to provide extremely low latency guarantees but batch M/R will still be slow. Now the business logic is such that at the time of emitting the data, we might have to hold on to some tuples until we get more information. This 'more' information is essentially will be coming in streams of future streams. You can say that this is kind of *word count* use case where we have to *aggregate and maintain state across batches of streams.* One thing different here is that we might have to* maintain the state or data for a day or two* until rest of the data comes in and then we can complete our output. 1- Questions is that is such is use cases supported in Spark and/or Spark Streaming? 2- Will we be able to persist partially aggregated data until the rest of the information comes in later in time? I am mentioning *persistence* here that given that the delay can be spanned over a day or two we won't want to keep the partial data in memory for so long. I know this can be done in Storm but I am really interested in Spark because of its close integration with Hadoop. We might not even want to use Spark Streaming (which is more of a direct comparison with Storm/Trident) given our application does not have to be real-time in split-second. Feel free to direct me to any document or resource. Thanks a lot. Regards, Shahab
Re: Join : Giving incorrect result
Hi Ajay, would you mind to synthesise a minimum code snippet that can reproduce this issue and paste it here? On Wed, Jun 4, 2014 at 8:32 PM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Java IO Stream Corrupted - Invalid Type AC?
Hi Im trying run some spark code on a cluster but I keep running into a java.io.StreamCorruptedException: invalid type code: AC error. My task involves analyzing ~50GB of data (some operations involve sorting) then writing them out to a JSON file. Im running the analysis on each of the data's ~10 columns and have never had a successful run. My program seems to run for a varying amount of time each time (~between 5-30 minutes) but it always terminates with this error. I have tried running the tool on just the first two columns of the data and I have had experiences where I get the same error, and I have also had experiences where it ran successfully. Were running Spark 1.0.0 and unfortunately I dont know if the code would have worked on prior versions of spark. Does any one else have any experience with this? Here is the full message: Exception in thread main [INFO] 04 Jun 2014 10:14:00 - org.apache.spark.Logging$class - Cancelling stage 107 org.apache.spark.SparkException: Job aborted due to stage failure: Task 107.2:171 failed 4 times, most recent failure: Exception failure in TID 31857 on host cluster: java.io.StreamCorruptedException: invalid type code: AC java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:87) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:101) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:100) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
Re: SocketException when reading from S3 (s3n format)
I think by default a thread can die up to 4 times before Spark considers it a failure. Are you seeing that happen? I believe that is a configurable thing, but don't know off the top of my head how to change it. I've seen this error before when reading data from a large amount of files on S3, and it is typically harmless. Spark just retries the operation and proceeds normally. On Wed, Jun 4, 2014 at 4:05 AM, yuzeh delta1...@gmail.com wrote: I should add that I'm using spark 0.9.1. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SocketException-when-reading-from-S3-s3n-format-tp6889p6890.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Yay for 1.0.0! EC2 Still has problems.
On Wed, Jun 4, 2014 at 9:35 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Oh, I went back to m1.large while those issues get sorted out. Random side note, Amazon is deprecating the m1 instances in favor of m3 instances, which have SSDs and more ECUs than their m1 counterparts. m3.2xlarge has 30GB of RAM and may be a good-enough substitution for the r3 instances for you for the time being. Nick
Re: is there any easier way to define a custom RDD in Java
Just curious, what do you want your custom RDD to do that the normal ones don't? On Wed, Jun 4, 2014 at 6:30 AM, bluejoe2008 bluejoe2...@gmail.com wrote: hi, folks, is there any easier way to define a custom RDD in Java? I am wondering if I have to define a new java class which extends RDD from scratch? It is really a hard job for developers! 2014-06-04 -- bluejoe2008
Re: Java IO Stream Corrupted - Invalid Type AC?
On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo mki...@oculusinfo.com wrote: Im trying run some spark code on a cluster but I keep running into a java.io.StreamCorruptedException: invalid type code: AC error. My task involves analyzing ~50GB of data (some operations involve sorting) then writing them out to a JSON file. Im running the analysis on each of the data's ~10 columns and have never had a successful run. My program seems to run for a varying amount of time each time (~between 5-30 minutes) but it always terminates with this error. I can tell you that this usually means somewhere something wrote objects to the same OutputStream with multiple ObjectOutputStreams. AC is a header value. I don't obviously see where/how that could happen, but maybe it rings a bell for someone. This could happen if an OutputStream is reused across object serializations but new ObjectOutputStreams are opened, for example.
Re: Error related to serialisation in spark streaming
nilmish, To confirm your code is using kryo, go to the web ui of your application (defaults to :4040) and look at the environment tab. If your serializer settings are there then things should be working properly. I'm not sure how to confirm that it works against typos in the setting, but you can at least confirm that the setting is making it to the application with that webui. Cheers, Andrew On Wed, Jun 4, 2014 at 3:48 AM, nilmish nilmish@gmail.com wrote: The error is resolved. I was using a comparator which was not serialised because of which it was throwing the error. I have now switched to kryo serializer as it is faster than java serialser. I have set the required config conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyRegistrator); and also in MyRegistrator class I have registered all the classes I am serialising. How can I confirm that my code is actually using kryo serialiser and not java serialiser now ? PS : It seems like my code is still not using kryo serialiser. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to change default storage levels
You can change storage level on an individual RDD with .persist(StorageLevel.MEMORY_AND_DISK), but I don't think you can change what the default persistency level is for RDDs. Andrew On Wed, Jun 4, 2014 at 1:52 AM, Salih Kardan karda...@gmail.com wrote: Hi I'm using Spark 0.9.1 and Shark 0.9.1. My dataset does not fit into memory I have in my cluster setup, so I want to use also disk for caching. I guess MEMORY_ONLY is the default storage level in Spark. If that's the case how could I change the storage level to MEMORY_AND_DISK in Spark? thanks Salih
Re: RDD with a Map
Thanks folks. I was trying to get the RDD[multimap] so the collectAsMap is what I needed. Best, Amit On Jun 4, 2014, at 6:53, Cheng Lian lian.cs@gmail.com wrote: On Wed, Jun 4, 2014 at 5:56 AM, Amit Kumar kumarami...@gmail.com wrote: Hi Folks, I am new to spark -and this is probably a basic question. I have a file on the hdfs 1, one 1, uno 2, two 2, dos I want to create a multi Map RDD RDD[Map[String,List[String]]] {1-[one,uno], 2-[two,dos]} Actually what you described is not a “multi-map RDD”, the type of this RDD should be something like RDD[(String, List[String]]. RDD[Map[String, List[String]]] indicates that each element within this RDD is itself a Map[String, List[String]], and I don’t think this is what you want according to the context. First I read the file val identityData:RDD[String] = sc.textFile($path_to_the_file, 2).cache() You don’t need to call .cache() here since identityData is used only once, so the cached data won’t be used anywhere. val identityDataList:RDD[List[String]]= identityData.map{ line = val splits= line.split(,) splits.toList } Turn the text line into a (String, String) pair would be much more useful since then you can call functions like groupByKey, which are defined in PairRDDFunctions: val identityDataPairs: RDD[(String, String)] = identityData.map { line = val Array(key, value) = line.split(,) key - value } Then I group them by the first element val grouped:RDD[(String,Iterable[List[String]])]= songArtistDataList.groupBy{ element ={ element(0) } } Using groupByKey on pair RDDs is more convenient as mentioned above: val grouped: RDD[(String, Iterable[String])] = identityDataPairs.groupByKey() Then I do the equivalent of mapValues of scala collections to get rid of the first element val groupedWithValues:RDD[(String,List[String])] = grouped.flatMap[(String,List[String])]{ case (key,list)={ List((key,list.map{element = { element(1) }}.toList)) } } for this to actually materialize I do collect val groupedAndCollected=groupedWithValues.collect() I get an Array[String,List[String]]. I am trying to figure out if there is a way for me to get Map[String,List[String]] (a multimap), or to create an RDD[Map[String,List[String]] ] To get a Map[String, Iterable[String]], you may simply call collectAsMap which is only defined on pair RDDs: val groupedAndCollected = grouped.collectAsMap() I am sure there is something simpler, I would appreciate advice. Many thanks, Amit At last, be careful if you are processing large volume of data, since groupByKey is an expensive transformation, and collecting all the data to driver side may simply cause OOM if the data can’t fit in the driver node. Best Cheng XN0LCBi ZSBjYXJlZnVsIGlmIHlvdSBhcmUgcHJvY2Vzc2luZyBsYXJnZSB2b2x1bWUgb2YgZGF0YSwgc2lu Y2UgYGdyb3VwQnlLZXlgIGlzIGFuIGV4cGVuc2l2ZSB0cmFuc2Zvcm1hdGlvbiwgYW5kIGNvbGxl Y3RpbmcgYWxsIHRoZSBkYXRhIHRvIGRyaXZlciBzaWRlIG1heSBzaW1wbHkgY2F1c2UgT09NIGlm IHRoZSBkYXRhIGNhbid0IGZpdCBpbiB0aGUgZHJpdmVyIG5vZGUuPC9kaXY+PC9kaXY+PGJyPjwv ZGl2Pg== style=height:0;font-size:0em;padding:0;margin:0
Re: RDD with a Map
Yes, RDD as a map of String keys and List of string as values. Amit On Jun 4, 2014, at 2:46, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Just a thought... Are you trying to use use the RDD as a Map? On 3 June 2014 23:14, Doris Xin doris.s@gmail.com wrote: Hey Amit, You might want to check out PairRDDFunctions. For your use case in particular, you can load the file as a RDD[(String, String)] and then use the groupByKey() function in PairRDDFunctions to get an RDD[(String, Iterable[String])]. Doris On Tue, Jun 3, 2014 at 2:56 PM, Amit Kumar kumarami...@gmail.com wrote: Hi Folks, I am new to spark -and this is probably a basic question. I have a file on the hdfs 1, one 1, uno 2, two 2, dos I want to create a multi Map RDD RDD[Map[String,List[String]]] {1-[one,uno], 2-[two,dos]} First I read the file val identityData:RDD[String] = sc.textFile($path_to_the_file, 2).cache() val identityDataList:RDD[List[String]]= identityData.map{ line = val splits= line.split(,) splits.toList } Then I group them by the first element val grouped:RDD[(String,Iterable[List[String]])]= songArtistDataList.groupBy{ element ={ element(0) } } Then I do the equivalent of mapValues of scala collections to get rid of the first element val groupedWithValues:RDD[(String,List[String])] = grouped.flatMap[(String,List[String])]{ case (key,list)={ List((key,list.map{element = { element(1) }}.toList)) } } for this to actually materialize I do collect val groupedAndCollected=groupedWithValues.collect() I get an Array[String,List[String]]. I am trying to figure out if there is a way for me to get Map[String,List[String]] (a multimap), or to create an RDD[Map[String,List[String]] ] I am sure there is something simpler, I would appreciate advice. Many thanks, Amit -- Kind regards, Oleg
Re: Can this be done in map-reduce technique (in parallel)
When you group by IP address in step 1 to this: (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) How many lat/lon locations do you expect for each IP address? avg and max are interesting. Andrew On Wed, Jun 4, 2014 at 5:29 AM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: It is possible if you use a cartesian product to produce all possible pairs for each IP address and 2 stages of map-reduce: - first by pairs of points to find the total of each pair and - second by IP address to find the pair for each IP address with the maximum count. Oleg On 4 June 2014 11:49, lmk lakshmi.muralikrish...@gmail.com wrote: Hi, I am a new spark user. Pls let me know how to handle the following scenario: I have a data set with the following fields: 1. DeviceId 2. latitude 3. longitude 4. ip address 5. Datetime 6. Mobile application name With the above data, I would like to perform the following steps: 1. Collect all lat and lon for each ipaddress (ip1,(lat1,lon1),(lat2,lon2)) (ip2,(lat3,lon3),(lat4,lat5)) 2. For each IP, 1.Find the distance between each lat and lon coordinate pair and all the other pairs under the same IP 2.Select those coordinates whose distances fall under a specific threshold (say 100m) 3.Find the coordinate pair with the maximum occurrences In this case, how can I iterate and compare each coordinate pair with all the other pairs? Can this be done in a distributed manner, as this data set is going to have a few million records? Can we do this in map/reduce commands? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
pyspark join crash
Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: Spark not working with mesos
Since $HADOOP_HOME is deprecated, try adding it to the Mesos configuration file. Add `export MESOS_HADOOP_HOME=$HADOOP_HOME to ~/.bashrc` and that should solve your error -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-not-working-with-mesos-tp6806p6939.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0
On Tue, Jun 3, 2014 at 8:46 PM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: Hi All, I've been experiencing a very strange error after upgrade from Spark 0.9 to 1.0 - it seems that saveAsTestFile function is throwing java.lang.UnsupportedOperationException that I have never seen before. In the stack trace you quoted, saveAsTextFile is not called. Is it really throwing an exception? Do you have the stack trace from the executor process? I think the exception originates from there, and the scheduler is just reporting it here. Any hints appreciated. scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [duplicate 45] 14/06/03 16:46:23 ERROR actor.OneForOneStrategy: java.lang.UnsupportedOperationException at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32) at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:183) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:176) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:176) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1058) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1045) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:998) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1151) at org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1147) at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295) at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:253) at akka.actor.ActorCell.handleFailure(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:423) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Marek
Re: Better line number hints for logging?
Oh, this would be super useful for us too! Actually wouldn't it be best if you could see the whole call stack on the UI, rather than just one line? (Of course you would have to click to expand it.) On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com wrote: Ok, I will probably open a Jira. On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can use RDD.setName to give it a name. There’s also a creationSite field that is private[spark] — we may want to add a public setter for that later. If the name isn’t enough and you’d like this, please open a JIRA issue for it. Matei On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote: I have created some extension methods for RDDs in RichRecordRDD and these are working exceptionally well for me. However, when looking at the logs, its impossible to tell what's going on because all the line number hints point to RichRecordRDD.scala rather than the code that uses it. For example: INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at RichRecordRDD.scala:633), which is now runnable Is there any way set up my extension methods class so that the logs will print a more useful line number?
Re: Spark 1.0.0 fails if mesos.coarse set to true
I am also getting the exact error, with the exact logs when I run Spark 1.0.0 in coarse-grained mode. Coarse grained mode works perfectly with earlier versions that I tested - 0.9.1 and 0.9.0, but seems to have undergone some modification in spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-fails-if-mesos-coarse-set-to-true-tp6817p6942.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.0 fails if mesos.coarse set to true
Are you using spark-submit to run your application? On Wed, Jun 4, 2014 at 8:49 AM, ajatix a...@sigmoidanalytics.com wrote: I am also getting the exact error, with the exact logs when I run Spark 1.0.0 in coarse-grained mode. Coarse grained mode works perfectly with earlier versions that I tested - 0.9.1 and 0.9.0, but seems to have undergone some modification in spark 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-fails-if-mesos-coarse-set-to-true-tp6817p6942.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.0 fails if mesos.coarse set to true
I'm running a manually built cluster on EC2. I have mesos (0.18.2) and hdfs (2.0.0-cdh4.5.0) installed on all slaves (3) and masters (3). I have spark-1.0.0 on one master and the executor file is on hdfs for the slaves. Whenever I try to launch a spark application on the cluster, it starts a task on each slave (i'm using default configs) and they start FAILING with the error msg - 'Is spark installed on it?' -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-fails-if-mesos-coarse-set-to-true-tp6817p6945.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0
Actually, what the stack trace is showing is the result of an exception being thrown by the DAGScheduler's event processing actor. What happens is that the Supervisor tries to shut down Spark when an exception is thrown by that actor. As part of the shutdown procedure, the DAGScheduler tries to cancel any jobs running on the cluster, but the scheduler backend for Mesos doesn't yet implement killTask, so the shutdown procedure fails with an UnsupportedOperationException. In other words, the stack trace is all about failure to cleanly shut down in response to some prior failure. What that prior, root-cause failure actually was is not clear to me from the stack trace or bug report, but at least the failure to shut down should be fixed in Spark 1.0.1 after PR 686 https://github.com/apache/spark/pull/686 is merged. Was this an application created with the Python API? There have been some similar bug reports associated with Python applications, but I'm not sure at this point that the problem actually resides in PySpark. On Wed, Jun 4, 2014 at 8:38 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: On Tue, Jun 3, 2014 at 8:46 PM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: Hi All, I've been experiencing a very strange error after upgrade from Spark 0.9 to 1.0 - it seems that saveAsTestFile function is throwing java.lang.UnsupportedOperationException that I have never seen before. In the stack trace you quoted, saveAsTextFile is not called. Is it really throwing an exception? Do you have the stack trace from the executor process? I think the exception originates from there, and the scheduler is just reporting it here. Any hints appreciated. scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [duplicate 45] 14/06/03 16:46:23 ERROR actor.OneForOneStrategy: java.lang.UnsupportedOperationException at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32) at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:183) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:176) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:176) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1058) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1045) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:998) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1151) at org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1147) at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295) at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:253) at akka.actor.ActorCell.handleFailure(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:423) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
Re: Spark 1.0.0 fails if mesos.coarse set to true
Exactly the same story - it used to work with 0.9.1 and does not work anymore with 1.0.0. I ran tests using spark-shell as well as my application(so tested turning on coarse mode via env variable and SparkContext properties explicitly) M. 2014-06-04 18:12 GMT+02:00 ajatix a...@sigmoidanalytics.com: I'm running a manually built cluster on EC2. I have mesos (0.18.2) and hdfs (2.0.0-cdh4.5.0) installed on all slaves (3) and masters (3). I have spark-1.0.0 on one master and the executor file is on hdfs for the slaves. Whenever I try to launch a spark application on the cluster, it starts a task on each slave (i'm using default configs) and they start FAILING with the error msg - 'Is spark installed on it?' -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-fails-if-mesos-coarse-set-to-true-tp6817p6945.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: using Log4j to log INFO level messages on workers
Hello Alex Thanks for the link. Yes creating a singleton object for logging outside the code that gets executed on the workers definitely works. The problem that i am facing though is related to configuration of the logger. I don't see any log messages in the worker logs of the application. a) when i use println, I see the messages from the worker being logged into the main driver of the application b) when i use logger, i see logger messages from the main() but not from the workers. Maybe I should upload a MWE (minimum working example) to demonstrate my point. Thanks Shivani On Mon, Jun 2, 2014 at 10:33 PM, Alex Gaudio adgau...@gmail.com wrote: Hi, I had the same problem with pyspark. Here's how I resolved it: What I've found in python (not sure about scala) is that if the function being serialized was written in the same python module as the main function, then logging fails. If the serialized function is in a separate module, then logging does not fail. I just created this gist to demo the situation and (python) solution. Is there a similar way to do this in scala? https://gist.github.com/adgaudio/0191e14717af68bbba81 Alex On Mon, Jun 2, 2014 at 7:18 PM, Shivani Rao raoshiv...@gmail.com wrote: Hello Spark fans, I am trying to log messages from my spark application. When the main() function attempts to log, using log.info() it works great, but when I try the same command from the code that probably runs on the worker, I initially got an serialization error. To solve that, I created a new logger in the code that operates on the data, which solved the serialization issue but now there is no output in the console or on the worker node logs. I don't see any application level log messages in the spark logs either. When I use println() instead, I do see console output being generated. I tried the following and none of them works a) pass log4j.properties by using -Dlog4j.properties in my java command line initiation of the spark application b) setting the properties within the worker by calling log.addAppender(new ConsoleAppender) None of them work. What am i missing? Thanks, Shivani -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Re: Using mongo with PySpark
Thanks a lot, sorry for the really late reply! (Didn't have my laptop) This is working, but it's dreadfully slow and seems to not run in parallel? On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com wrote: You need to use mapPartitions (or foreachPartition) to instantiate your client in each partition as it is not serializable by the pickle library. Something like def mapper(iter): db = MongoClient()['spark_test_db'] *collec = db['programs']* *for val in iter:* asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) yield collec.insert(json) def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *doc_ids = data.mapPartitions(mapper)* On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *jsons = data.map(mapper)* *The last line does the mapping. I am very new to Spark, can you explain what explicit serialization, etc is in the context of spark? The error I am getting:* *Traceback (most recent call last): File stdin, line 1, in module File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd pickled_command = CloudPickleSerializer().dumps(command) File /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps def dumps(self, obj): return cloudpickle.dumps(obj, 2) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps cp.dump(obj) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump return pickle.Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 636, in _batch_appends save(tmp[0]) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in save_function self.save_function_tuple(obj, modList) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in save_function_tuplesave(f_globals) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in save_dictpickle.Pickler.save_dict(self, obj) File /usr/lib/python2.7/pickle.py, line 649, in save_dict self._batch_setitems(obj.iteritems()) File /usr/lib/python2.7/pickle.py, line 681, in _batch_setitems save(v) File /usr/lib/python2.7/pickle.py, line 306, in saverv = reduce(self.proto) File /usr/local/lib/python2.7/dist-packages/pymongo/collection.py, line 1489, in __call__ self.__name.split(.)[-1])TypeError: 'Collection' object is not callable. If you meant to call the '__getnewargs__' method on a 'Collection' object it is failing because no such method exists. * On Sat, May 17, 2014 at 9:30 PM,
Re: Spark 1.0.0 fails if mesos.coarse set to true
Hey, thanks a lot for reporting this. Do you mind making a JIRA with the details so we can track it? - Patrick On Wed, Jun 4, 2014 at 9:24 AM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: Exactly the same story - it used to work with 0.9.1 and does not work anymore with 1.0.0. I ran tests using spark-shell as well as my application(so tested turning on coarse mode via env variable and SparkContext properties explicitly) M. 2014-06-04 18:12 GMT+02:00 ajatix a...@sigmoidanalytics.com: I'm running a manually built cluster on EC2. I have mesos (0.18.2) and hdfs (2.0.0-cdh4.5.0) installed on all slaves (3) and masters (3). I have spark-1.0.0 on one master and the executor file is on hdfs for the slaves. Whenever I try to launch a spark application on the cluster, it starts a task on each slave (i'm using default configs) and they start FAILING with the error msg - 'Is spark installed on it?' -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-fails-if-mesos-coarse-set-to-true-tp6817p6945.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: is there any easier way to define a custom RDD in Java
Hey There, This is only possible in Scala right now. However, this is almost never needed since the core API is fairly flexible. I have the same question as Andrew... what are you trying to do with your RDD? - Patrick On Wed, Jun 4, 2014 at 7:49 AM, Andrew Ash and...@andrewash.com wrote: Just curious, what do you want your custom RDD to do that the normal ones don't? On Wed, Jun 4, 2014 at 6:30 AM, bluejoe2008 bluejoe2...@gmail.com wrote: hi, folks, is there any easier way to define a custom RDD in Java? I am wondering if I have to define a new java class which extends RDD from scratch? It is really a hard job for developers! 2014-06-04 bluejoe2008
Re: error with cdh 5 spark installation
Hey Chirag, Those init scripts are part of the Cloudera Spark package (they are not in the Spark project itself) so you might try e-mailing their support lists directly. - Patrick On Wed, Jun 4, 2014 at 7:19 AM, chirag lakhani chirag.lakh...@gmail.com wrote: I recently spun up an AWS cluster with cdh 5 using Cloudera Manager. I am trying to install spark and simply used the install command, as stated in the CDH 5 documentation. sudo apt-get install spark-core spark-master spark-worker spark-python I get the following error Setting up spark-master (0.9.0+cdh5.0.1+33-1.cdh5.0.1.p0.25~precise-cdh5.0.1) ... * Starting Spark master (spark-master): invoke-rc.d: initscript spark-master, action start failed. dpkg: error processing spark-master (--configure): subprocess installed post-installation script returned error exit status 1 Errors were encountered while processing: spark-master Has anyone else encountered this? Does anyone have any suggestions of what to do about it? Chirag
Re: error with cdh 5 spark installation
Spark is already part of the distribution, and the core CDH5 parcel. You shouldn't need extra steps unless you're doing something special. It may be that this is the very cause of the error when trying to install over the existing services. On Wed, Jun 4, 2014 at 3:19 PM, chirag lakhani chirag.lakh...@gmail.com wrote: I recently spun up an AWS cluster with cdh 5 using Cloudera Manager. I am trying to install spark and simply used the install command, as stated in the CDH 5 documentation. sudo apt-get install spark-core spark-master spark-worker spark-python I get the following error Setting up spark-master (0.9.0+cdh5.0.1+33-1.cdh5.0.1.p0.25~precise-cdh5.0.1) ... * Starting Spark master (spark-master): invoke-rc.d: initscript spark-master, action start failed. dpkg: error processing spark-master (--configure): subprocess installed post-installation script returned error exit status 1 Errors were encountered while processing: spark-master Has anyone else encountered this? Does anyone have any suggestions of what to do about it? Chirag
Re: Can't seem to link external/twitter classes from my own app
Hey Jeremy, The issue is that you are using one of the external libraries and these aren't actually packaged with Spark on the cluster, so you need to create an uber jar that includes them. You can look at the example here (I recently did this for a kafka project and the idea is the same): https://github.com/pwendell/kafka-spark-example You'll want to make an uber jar that includes these packages (run sbt assembly) and then submit that jar to spark-submit. Also, I'd try running it locally first (if you aren't already) just to make the debugging simpler. - Patrick On Wed, Jun 4, 2014 at 6:16 AM, Sean Owen so...@cloudera.com wrote: Ah sorry, this may be the thing I learned for the day. The issue is that classes from that particular artifact are missing though. Worth interrogating the resulting .jar file with jar tf to see if it made it in? On Wed, Jun 4, 2014 at 2:12 PM, Nick Pentreath nick.pentre...@gmail.com wrote: @Sean, the %% syntax in SBT should automatically add the Scala major version qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct syntax for the build. I seemed to run into this issue with some missing Jackson deps, and solved it by including the jar explicitly on the driver class path: bin/spark-submit --driver-class-path SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar Seems redundant to me since I thought that the JAR as argument is copied to driver and made available. But this solved it for me so perhaps give it a try? On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ Which I'm guessing is because I haven't put in a dependency to external/twitter in the .sbt, but _how_? I can't find any docs on it. Here's my build file so far: simple.sbt -- name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.0.0 libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3 resolvers += Akka Repository at http://repo.akka.io/releases/; -- I've tried a few obvious things like adding: libraryDependencies += org.apache.spark %% spark-external % 1.0.0 libraryDependencies += org.apache.spark %% spark-external-twitter % 1.0.0 because, well, that would match the naming scheme implied so far, but it errors. Also, I just realized I don't completely understand if: (a) the spark-submit command _sends_ the .jar to all the workers, or (b) the spark-submit commands sends a _job_ to the workers, which are supposed to already have the jar file installed (or in hdfs), or (c) the Context is supposed to list the jars to be distributed. (is that deprecated?) One part of the documentation says: Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar. but another says: application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. I suppose both could be correct if you take a certain point of view. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Invalid Class Exception
I am building Spark by myself and I am using Java 7 to both build and run. I will try with Java 6. Thanks, Suman. On 6/3/2014 7:18 PM, Matei Zaharia wrote: What Java version do you have, and how did you get Spark (did you build it yourself by any chance or download a pre-built one)? If you build Spark yourself you need to do it with Java 6 — it’s a known issue because of the way Java 6 and 7 package JAR files. But I haven’t seen it result in this particular error. Matei On Jun 3, 2014, at 5:18 PM, Suman Somasundar suman.somasun...@oracle.com wrote: Hi all, I get the following exception when using Spark to run example k-means program. I am using Spark 1.0.0 and running the program locally. java.io.InvalidClassException: scala.Tuple2; invalid descriptor for field _1 at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:697) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:827) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1583) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:87) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:101) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:100) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.IllegalArgumentException: illegal signature at java.io.ObjectStreamField.init(ObjectStreamField.java:119) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:695) ... 26 more Anyone know why this is happening? Thanks, Suman.
Re: Invalid Class Exception
I tried building with Java 6 and also tried the pre-built packages. I am still getting the same error. It works fine when I run it on a machine with Solaris OS and X-86 architecture. But, it does not work with Solaris OS and Sparc architecture. Any ideas, why this would happen? Thanks, Suman. On 6/4/2014 10:48 AM, Suman Somasundar wrote: I am building Spark by myself and I am using Java 7 to both build and run. I will try with Java 6. Thanks, Suman. On 6/3/2014 7:18 PM, Matei Zaharia wrote: What Java version do you have, and how did you get Spark (did you build it yourself by any chance or download a pre-built one)? If you build Spark yourself you need to do it with Java 6 — it’s a known issue because of the way Java 6 and 7 package JAR files. But I haven’t seen it result in this particular error. Matei On Jun 3, 2014, at 5:18 PM, Suman Somasundar suman.somasun...@oracle.com wrote: Hi all, I get the following exception when using Spark to run example k-means program. I am using Spark 1.0.0 and running the program locally. java.io.InvalidClassException: scala.Tuple2; invalid descriptor for field _1 at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:697) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:827) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1583) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:87) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:101) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:100) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.IllegalArgumentException: illegal signature at java.io.ObjectStreamField.init(ObjectStreamField.java:119) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:695) ... 26 more Anyone know why this is happening? Thanks, Suman.
Re: Trouble launching EC2 Cluster with Spark
Thanks you! The regions advice solved the problem for my friend who was getting the key pair does not exist problem. I am still getting the error: ERROR:boto:400 Bad Request ERROR:boto:?xml version=1.0 encoding=UTF-8? ResponseErrorsErrorCodeInvalidParameterValue/CodeMessageInvalid value 'null' for protocol. VPC security group rules must specify protocols explicitly./Message/Error/ErrorsRequestID7ff92687-b95a-4a39-94cb-e2d00a6928fd/RequestID/Response This sounds like it could have to do with the access settings of the security group, but I don't know how to change. Any advice would be much appreciated! Sam - Original Message - From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org Sent: Wednesday, June 4, 2014 8:52:59 AM Subject: Re: Trouble launching EC2 Cluster with Spark One reason could be that the keys are in a different region. Need to create the keys in us-east-1-North Virginia. Cheers k/ On Wed, Jun 4, 2014 at 7:45 AM, Sam Taylor Steyer sste...@stanford.edu wrote: Hi, I am trying to launch an EC2 cluster from spark using the following command: ./spark-ec2 -k HackerPair -i [path]/HackerPair.pem -s 2 launch HackerCluster I set my access key id and secret access key. I have been getting an error in the setting up security groups... phase: Invalid value 'null' for protocol. VPC security groups must specify protocols explicitly. My project partner gets one step further and then gets the error The key pair 'JamesAndSamTest' does not exist. Any thoughts as to how we could fix these problems? Thanks a lot! Sam
Re: Trouble launching EC2 Cluster with Spark
chmod 600 path/FinalKey.pem Cheers k/ On Wed, Jun 4, 2014 at 12:49 PM, Sam Taylor Steyer sste...@stanford.edu wrote: Also, once my friend logged in to his cluster he received the error Permissions 0644 for 'FinalKey.pem' are too open. This sounds like the other problem described. How do we make the permissions more private? Thanks very much, Sam - Original Message - From: Sam Taylor Steyer sste...@stanford.edu To: user@spark.apache.org Sent: Wednesday, June 4, 2014 12:42:04 PM Subject: Re: Trouble launching EC2 Cluster with Spark Thanks you! The regions advice solved the problem for my friend who was getting the key pair does not exist problem. I am still getting the error: ERROR:boto:400 Bad Request ERROR:boto:?xml version=1.0 encoding=UTF-8? ResponseErrorsErrorCodeInvalidParameterValue/CodeMessageInvalid value 'null' for protocol. VPC security group rules must specify protocols explicitly./Message/Error/ErrorsRequestID7ff92687-b95a-4a39-94cb-e2d00a6928fd/RequestID/Response This sounds like it could have to do with the access settings of the security group, but I don't know how to change. Any advice would be much appreciated! Sam - Original Message - From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org Sent: Wednesday, June 4, 2014 8:52:59 AM Subject: Re: Trouble launching EC2 Cluster with Spark One reason could be that the keys are in a different region. Need to create the keys in us-east-1-North Virginia. Cheers k/ On Wed, Jun 4, 2014 at 7:45 AM, Sam Taylor Steyer sste...@stanford.edu wrote: Hi, I am trying to launch an EC2 cluster from spark using the following command: ./spark-ec2 -k HackerPair -i [path]/HackerPair.pem -s 2 launch HackerCluster I set my access key id and secret access key. I have been getting an error in the setting up security groups... phase: Invalid value 'null' for protocol. VPC security groups must specify protocols explicitly. My project partner gets one step further and then gets the error The key pair 'JamesAndSamTest' does not exist. Any thoughts as to how we could fix these problems? Thanks a lot! Sam
Re: Trouble launching EC2 Cluster with Spark
Awesome, that worked. Thank you! - Original Message - From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org Sent: Wednesday, June 4, 2014 12:52:00 PM Subject: Re: Trouble launching EC2 Cluster with Spark chmod 600 path/FinalKey.pem Cheers k/ On Wed, Jun 4, 2014 at 12:49 PM, Sam Taylor Steyer sste...@stanford.edu wrote: Also, once my friend logged in to his cluster he received the error Permissions 0644 for 'FinalKey.pem' are too open. This sounds like the other problem described. How do we make the permissions more private? Thanks very much, Sam - Original Message - From: Sam Taylor Steyer sste...@stanford.edu To: user@spark.apache.org Sent: Wednesday, June 4, 2014 12:42:04 PM Subject: Re: Trouble launching EC2 Cluster with Spark Thanks you! The regions advice solved the problem for my friend who was getting the key pair does not exist problem. I am still getting the error: ERROR:boto:400 Bad Request ERROR:boto:?xml version=1.0 encoding=UTF-8? ResponseErrorsErrorCodeInvalidParameterValue/CodeMessageInvalid value 'null' for protocol. VPC security group rules must specify protocols explicitly./Message/Error/ErrorsRequestID7ff92687-b95a-4a39-94cb-e2d00a6928fd/RequestID/Response This sounds like it could have to do with the access settings of the security group, but I don't know how to change. Any advice would be much appreciated! Sam - Original Message - From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org Sent: Wednesday, June 4, 2014 8:52:59 AM Subject: Re: Trouble launching EC2 Cluster with Spark One reason could be that the keys are in a different region. Need to create the keys in us-east-1-North Virginia. Cheers k/ On Wed, Jun 4, 2014 at 7:45 AM, Sam Taylor Steyer sste...@stanford.edu wrote: Hi, I am trying to launch an EC2 cluster from spark using the following command: ./spark-ec2 -k HackerPair -i [path]/HackerPair.pem -s 2 launch HackerCluster I set my access key id and secret access key. I have been getting an error in the setting up security groups... phase: Invalid value 'null' for protocol. VPC security groups must specify protocols explicitly. My project partner gets one step further and then gets the error The key pair 'JamesAndSamTest' does not exist. Any thoughts as to how we could fix these problems? Thanks a lot! Sam
Re: Join : Giving incorrect result
Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Re: access hdfs file name in map()
N/M.. I wrote a HadoopRDD subclass and append one env field of the HadoopPartition to the value in compute function. It worked pretty well. Thanks! On Jun 4, 2014 12:22 AM, Xu (Simon) Chen xche...@gmail.com wrote: I don't quite get it.. mapPartitionWithIndex takes a function that maps an integer index and an iterator to another iterator. How does that help with retrieving the hdfs file name? I am obviously missing some context.. Thanks. On May 30, 2014 1:28 AM, Aaron Davidson ilike...@gmail.com wrote: Currently there is not a way to do this using textFile(). However, you could pretty straightforwardly define your own subclass of HadoopRDD [1] in order to get access to this information (likely using mapPartitionsWithIndex to look up the InputSplit for a particular partition). Note that sc.textFile() is just a convenience function to construct a new HadoopRDD [2]. [1] HadoopRDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L93 [2] sc.textFile(): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L456 On Thu, May 29, 2014 at 7:49 PM, Xu (Simon) Chen xche...@gmail.com wrote: Hello, A quick question about using spark to parse text-format CSV files stored on hdfs. I have something very simple: sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p = (XXX, p[0], p[2])) Here, I want to replace XXX with a string, which is the current csv filename for the line. This is needed since some information may be encoded in the file name, like date. In hive, I am able to define an external table and use INPUT__FILE__NAME as a column in queries. I wonder if spark has something similar. Thanks! -Simon
Re: Join : Giving incorrect result
If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
reuse hadoop code in Spark
Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei
Re: reuse hadoop code in Spark
Yes, you can write some glue in Spark to call these. Some functions to look at: - SparkContext.hadoopRDD lets you create an input RDD from an existing JobConf configured by Hadoop (including InputFormat, paths, etc) - RDD.mapPartitions lets you operate in all the values on one partition (block) at a time, similar to how Mappers in MapReduce work - PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation. - RDD.pipe() can be used to call out to a script or binary, like Hadoop Streaming. A fair number of people have been running both Java and Hadoop Streaming apps like this. Matei On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote: Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei
Re: Better line number hints for logging?
That’s a good idea too, maybe we can change CallSiteInfo to do that. Matei On Jun 4, 2014, at 8:44 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Oh, this would be super useful for us too! Actually wouldn't it be best if you could see the whole call stack on the UI, rather than just one line? (Of course you would have to click to expand it.) On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com wrote: Ok, I will probably open a Jira. On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can use RDD.setName to give it a name. There’s also a creationSite field that is private[spark] — we may want to add a public setter for that later. If the name isn’t enough and you’d like this, please open a JIRA issue for it. Matei On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote: I have created some extension methods for RDDs in RichRecordRDD and these are working exceptionally well for me. However, when looking at the logs, its impossible to tell what's going on because all the line number hints point to RichRecordRDD.scala rather than the code that uses it. For example: INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at map at RichRecordRDD.scala:633), which is now runnable Is there any way set up my extension methods class so that the logs will print a more useful line number?
Re: pyspark join crash
In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: How can I dispose an Accumulator?
Will the broadcast variables be disposed automatically if the context is stopped, or do I still need to unpersist()? On Sat, May 31, 2014 at 1:20 PM, Patrick Wendell pwend...@gmail.com wrote: Hey There, You can remove an accumulator by just letting it go out of scope and it will be garbage collected. For broadcast variables we actually store extra information for it, so we provide hooks for users to remove the associated state. There is no such need for accumulators, though. - Patrick On Thu, May 29, 2014 at 2:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, How can I dispose an Accumulator? It has no method like 'unpersist()' which Broadcast provides. Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: SQLContext and HiveContext Query Performance
Hi, Just wondering if you can try this: val obj = sql(select manufacturer, count(*) as examcount from pft group by manufacturer order by examcount desc) obj.collect() obj.queryExecution.executedPlan.executeCollect() and time the third line alone. It could be that Spark SQL taking some time to run the optimizer generate physical plans that slows down the query. Thanks, Zongheng On Wed, Jun 4, 2014 at 2:16 PM, ssb61 santoshbalma...@gmail.com wrote: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-and-HiveContext-Query-Performance-tp6948p6976.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How can I dispose an Accumulator?
All of these are disposed of automatically if you stop the context or exit the program. Matei On Jun 4, 2014, at 2:22 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: Will the broadcast variables be disposed automatically if the context is stopped, or do I still need to unpersist()? On Sat, May 31, 2014 at 1:20 PM, Patrick Wendell pwend...@gmail.com wrote: Hey There, You can remove an accumulator by just letting it go out of scope and it will be garbage collected. For broadcast variables we actually store extra information for it, so we provide hooks for users to remove the associated state. There is no such need for accumulators, though. - Patrick On Thu, May 29, 2014 at 2:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, How can I dispose an Accumulator? It has no method like 'unpersist()' which Broadcast provides. Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: pyspark join crash
Hi Matei, Thanks for the reply and creating the JIRA. I hear what you're saying, although to be clear I want to still state that it seems like each reduce task is loading significantly more data than just the records needed for that task. The workers seem to load all data from each block containing a record needed by the reduce task. I base this hypothesis on the following: -My dataset is about 100G uncompressed, 22G serialized in memory with compression enabled -There are 130K records -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) -There are 3 cores per node (each running one reduce task at a time) -Each node has 32G of memory Note that I am attempting to join the dataset to itself and I ran this experiment after caching the dataset in memory with serialization and compression enabled. Given these figures, even with only 200 partitions the average output partition size (uncompressed) would be 1G (as the dataset is being joined to itself, resulting in 200G over 200 partitions), requiring 3G from each machine on average. The behavior I observe is that the kernel kills jobs in many of the nodes at nearly the exact same time right after the read phase starts; it seems likely this would occur in each node except the master begins detecting failures and stops the job (and I observe memory spiking on all machines). Indeed, I observe a large memory spike at each node. When I attempt the join with 2000 output partitions, it succeeds. Note that there are about 65 records per output partition on average, which means the reader only needs to load input from about 130 blocks (as the dataset is joined to itself). Given that the average uncompressed block size is 60M, even if the entire block were loaded (not just the relevant record) we would expect about 23G of memory to be used per node on average. I began suspecting the behavior of loading entire blocks based on the logging from the workers (i.e. BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty blocks out of 3354 blocks). If it is definitely not the case that entire blocks are loaded from the writers, then it would seem like there is some significant overhead which is chewing threw lots of memory (perhaps similar to the problem with python broadcast variables chewing through memory https://spark-project.atlassian.net/browse/SPARK-1065). -Brad On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com wrote: In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Re: SQLContext and HiveContext Query Performance
I timed the third line and here are stage timings, collect at SparkPlan.scala:52- 0.5 s mapPartitions at Exchange.scala:58 - 0.7 s RangePartitioner at Exchange.Scala:62 - 0.7 s RangePartitioner at Exchange.Scala:62 - 0.5 s mapPartitions at Exchange.scala:44 - 13 s Thanks, Santosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-and-HiveContext-Query-Performance-tp6948p6981.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Cassandra examples don't work for me
Hi, I’m following the directions to run the cassandra example “org.apache.spark.examples.CassandraTest” and I get this error Exception in thread main java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:113) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59) at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:370) at org.apache.spark.examples.CassandraTest$.main(CassandraTest.scala:100) at org.apache.spark.examples.CassandraTest.main(CassandraTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I’m running Cassandra version 2.0.6, and this comes from the spark-1.0.0-bin-hadoop2 distribution package. I am running the example with this commandline: bin/run-example org.apache.spark.examples.CassandraTest localhost localhost 9160 I suspect it’s because I’m running the wrong version of Cassandra, but I can’t find the correct version listed anywhere. I hope this is an easy issue to address. Much thanks, Tim
Re: Running a spark-submit compatible app in spark-shell
It took me a little while to get back to this but it works now!! I'm invoking the shell like this: spark-shell --jars target/scala-2.10/spark-etl_2.10-1.0.jar Once inside, I can invoke a method in my package to run the job. val reseult = etl.IP2IncomeJob.job(sc) On Tue, May 27, 2014 at 8:42 AM, Roger Hoover roger.hoo...@gmail.com wrote: Thanks, Andrew. I'll give it a try. On Mon, May 26, 2014 at 2:22 PM, Andrew Or and...@databricks.com wrote: Hi Roger, This was due to a bug in the Spark shell code, and is fixed in the latest master (and RC11). Here is the commit that fixed it: https://github.com/apache/spark/commit/8edbee7d1b4afc192d97ba192a5526affc464205. Try it now and it should work. :) Andrew 2014-05-26 10:35 GMT+02:00 Perttu Ranta-aho ranta...@iki.fi: Hi Roger, Were you able to solve this? -Perttu On Tue, Apr 29, 2014 at 8:11 AM, Roger Hoover roger.hoo...@gmail.com wrote: Patrick, Thank you for replying. That didn't seem to work either. I see the option parsed using verbose mode. Parsed arguments: ... driverExtraClassPath /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar But the jar still doesn't show up if I run :cp in the repl and the import still fails. scala import etl._ console:7: error: not found: value etl import etl._ Not sure if this helps, but I noticed with Spark 0.9.1 that the import only seems to work went I add the -usejavacp option to the spark-shell command. I don't really understand why. With the latest code, I tried adding these options to the spark-shell command without success: -usejavacp -Dscala.usejavacp=true On Mon, Apr 28, 2014 at 6:30 PM, Patrick Wendell pwend...@gmail.com wrote: What about if you run ./bin/spark-shell --driver-class-path=/path/to/your/jar.jar I think either this or the --jars flag should work, but it's possible there is a bug with the --jars flag when calling the Repl. On Mon, Apr 28, 2014 at 4:30 PM, Roger Hoover roger.hoo...@gmail.com wrote: A couple of issues: 1) the jar doesn't show up on the classpath even though SparkSubmit had it in the --jars options. I tested this by running :cp in spark-shell 2) After adding it the classpath using (:cp /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar), it still fails. When I do that in the scala repl, it works. BTW, I'm using the latest code from the master branch (8421034e793c0960373a0a1d694ce334ad36e747) On Mon, Apr 28, 2014 at 3:40 PM, Roger Hoover roger.hoo...@gmail.com wrote: Matei, thank you. That seemed to work but I'm not able to import a class from my jar. Using the verbose options, I can see that my jar should be included Parsed arguments: ... jars /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar And I see the class I want to load in the jar: jar -tf /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar | grep IP2IncomeJob etl/IP2IncomeJob$$anonfun$1.class etl/IP2IncomeJob$$anonfun$4.class etl/IP2IncomeJob$.class etl/IP2IncomeJob$$anonfun$splitOverlappingRange$1.class etl/IP2IncomeJob.class etl/IP2IncomeJob$$anonfun$3.class etl/IP2IncomeJob$$anonfun$2.class But the import fails scala import etl.IP2IncomeJob console:10: error: not found: value etl import etl.IP2IncomeJob Any ideas? On Sun, Apr 27, 2014 at 3:46 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Roger, You should be able to use the --jars argument of spark-shell to add JARs onto the classpath and then work with those classes in the shell. (A recent patch, https://github.com/apache/spark/pull/542, made spark-shell use the same command-line arguments as spark-submit). But this is a great question, we should test it out and see whether anything else would make development easier. SBT also has an interactive shell where you can run classes in your project, but unfortunately Spark can’t deal with closures typed directly in that the right way. However you write your Spark logic in a method and just call that method from the SBT shell, that should work. Matei On Apr 27, 2014, at 3:14 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, From the meetup talk about the 1.0 release, I saw that spark-submit will be the preferred way to launch apps going forward. How do you recommend launching such jobs in a development cycle? For example, how can I load an app that's expecting to a given to spark-submit into spark-shell? Also, can anyone recommend other tricks for rapid development? I'm new to Scala, sbt, etc. I think sbt can watch for changes in source files and compile them automatically. I want to be able to make code changes and quickly get into a spark-shell to play around with them. I appreciate any advice. Thanks, Roger
Re: error loading large files in PySpark 0.9.0
Hey Matei, Wanted to let you know this issue appears to be fixed in 1.0.0. Great work! -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p6985.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can't seem to link external/twitter classes from my own app
Thanks Patrick! Uberjars. Cool. I'd actually heard of them. And thanks for the link to the example! I shall work through that today. I'm still learning sbt and it's many options... the last new framework I learned was node.js, and I think I've been rather spoiled by npm. At least it's not maven. Please, oh please don't make me learn maven too. (The only people who seem to like it have Software Stockholm Syndrome: I know maven kidnapped me and beat me up, but if you spend long enough with it, you eventually start to sympathize and see it's point of view.) On Thu, Jun 5, 2014 at 3:39 AM, Patrick Wendell pwend...@gmail.com wrote: Hey Jeremy, The issue is that you are using one of the external libraries and these aren't actually packaged with Spark on the cluster, so you need to create an uber jar that includes them. You can look at the example here (I recently did this for a kafka project and the idea is the same): https://github.com/pwendell/kafka-spark-example You'll want to make an uber jar that includes these packages (run sbt assembly) and then submit that jar to spark-submit. Also, I'd try running it locally first (if you aren't already) just to make the debugging simpler. - Patrick On Wed, Jun 4, 2014 at 6:16 AM, Sean Owen so...@cloudera.com wrote: Ah sorry, this may be the thing I learned for the day. The issue is that classes from that particular artifact are missing though. Worth interrogating the resulting .jar file with jar tf to see if it made it in? On Wed, Jun 4, 2014 at 2:12 PM, Nick Pentreath nick.pentre...@gmail.com wrote: @Sean, the %% syntax in SBT should automatically add the Scala major version qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct syntax for the build. I seemed to run into this issue with some missing Jackson deps, and solved it by including the jar explicitly on the driver class path: bin/spark-submit --driver-class-path SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar Seems redundant to me since I thought that the JAR as argument is copied to driver and made available. But this solved it for me so perhaps give it a try? On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ Which I'm guessing is because I haven't put in a dependency to external/twitter in the .sbt, but _how_? I can't find any docs on it. Here's my build file so far: simple.sbt -- name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.0.0 libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3 resolvers += Akka Repository at http://repo.akka.io/releases/; -- I've tried a few obvious things like adding: libraryDependencies += org.apache.spark %% spark-external % 1.0.0 libraryDependencies += org.apache.spark %% spark-external-twitter % 1.0.0 because, well, that would match the naming scheme implied so far, but it errors. Also, I just realized I don't completely understand if: (a) the spark-submit command _sends_ the .jar to all the workers, or (b) the spark-submit commands sends a _job_ to the workers, which are supposed to already have the jar file installed (or in hdfs), or (c) the Context is supposed to list the jars to be distributed. (is that deprecated?) One part of the documentation says: Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar. but another says: application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. I suppose both could
Re: pyspark join crash
I think the problem is that once unpacked in Python, the objects take considerably more space, as they are stored as Python objects in a Python dictionary. Take a look at python/pyspark/join.py and combineByKey in python/pyspark/rdd.py. We should probably try to store these in serialized form. I’m not sure whether there’s a great way to inspect a Python process’s memory, but looking at what consumes memory in a reducer process would be useful. Matei On Jun 4, 2014, at 2:34 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Matei, Thanks for the reply and creating the JIRA. I hear what you're saying, although to be clear I want to still state that it seems like each reduce task is loading significantly more data than just the records needed for that task. The workers seem to load all data from each block containing a record needed by the reduce task. I base this hypothesis on the following: -My dataset is about 100G uncompressed, 22G serialized in memory with compression enabled -There are 130K records -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) -There are 3 cores per node (each running one reduce task at a time) -Each node has 32G of memory Note that I am attempting to join the dataset to itself and I ran this experiment after caching the dataset in memory with serialization and compression enabled. Given these figures, even with only 200 partitions the average output partition size (uncompressed) would be 1G (as the dataset is being joined to itself, resulting in 200G over 200 partitions), requiring 3G from each machine on average. The behavior I observe is that the kernel kills jobs in many of the nodes at nearly the exact same time right after the read phase starts; it seems likely this would occur in each node except the master begins detecting failures and stops the job (and I observe memory spiking on all machines). Indeed, I observe a large memory spike at each node. When I attempt the join with 2000 output partitions, it succeeds. Note that there are about 65 records per output partition on average, which means the reader only needs to load input from about 130 blocks (as the dataset is joined to itself). Given that the average uncompressed block size is 60M, even if the entire block were loaded (not just the relevant record) we would expect about 23G of memory to be used per node on average. I began suspecting the behavior of loading entire blocks based on the logging from the workers (i.e. BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty blocks out of 3354 blocks). If it is definitely not the case that entire blocks are loaded from the writers, then it would seem like there is some significant overhead which is chewing threw lots of memory (perhaps similar to the problem with python broadcast variables chewing through memory https://spark-project.atlassian.net/browse/SPARK-1065). -Brad On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com wrote: In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message Job aborted due to stage failure: Master removed our application: FAILED. The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad
Spark assembly error.
When I run sbt/sbt assembly, I get the following exception. Is anyone else experiencing a similar problem? .. [info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}assembly... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [info] Resolving org.eclipse.jetty#jetty-server;8.1.14.v20131031 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}examples... [info] Resolving com.typesafe.genjavadoc#genjavadoc-plugin_2.10.4;0.5 ... *[error] impossible to get artifacts when data has not been loaded. IvyNode = org.slf4j#slf4j-api;1.6.1* [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala:43: constructor TaskAttemptID in class TaskAttemptID is deprecated: see corresponding Javadoc for more information. [warn] new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:490: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(hadoopConfiguration) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:623: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(conf)
Re: custom receiver in java
Yes, thanks updating this old thread! We heard our community demands and added support for Java receivers! TD On Wed, Jun 4, 2014 at 12:15 PM, lbustelo g...@bustelos.com wrote: Not that what TD was referring above, is already in 1.0.0 http://spark.apache.org/docs/1.0.0/streaming-custom-receivers.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/custom-receiver-in-java-tp3575p6962.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark assembly error.
Nevermind, it turns out that this is a problem for the Pivotal Hadoop that we are trying to compile against. On Wed, Jun 4, 2014 at 4:16 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: When I run sbt/sbt assembly, I get the following exception. Is anyone else experiencing a similar problem? .. [info] Resolving org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}assembly... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [info] Resolving org.eclipse.jetty#jetty-server;8.1.14.v20131031 ... [info] Updating {file:/Users/Sung/Projects/spark_06_04_14/}examples... [info] Resolving com.typesafe.genjavadoc#genjavadoc-plugin_2.10.4;0.5 ... *[error] impossible to get artifacts when data has not been loaded. IvyNode = org.slf4j#slf4j-api;1.6.1* [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala:43: constructor TaskAttemptID in class TaskAttemptID is deprecated: see corresponding Javadoc for more information. [warn] new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:490: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(hadoopConfiguration) [warn] ^ [warn] /Users/Sung/Projects/spark_06_04_14/core/src/main/scala/org/apache/spark/SparkContext.scala:623: constructor Job in class Job is deprecated: see corresponding Javadoc for more information. [warn] val job = new NewHadoopJob(conf)
Re: Why Scala?
So Python is used in many of the Spark Ecosystem products, but not Streaming at this point. Is there a roadmap to include Python APIs in Spark Streaming? Anytime frame on this? Thanks! John On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 ( http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool safely ignored. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I recently discovered Hacker News and started reading through older posts about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick -- View this message in context: Why Scala? http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Why Scala?
We are definitely investigating a Python API for Streaming, but no announced deadline at this point. Matei On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote: So Python is used in many of the Spark Ecosystem products, but not Streaming at this point. Is there a roadmap to include Python APIs in Spark Streaming? Anytime frame on this? Thanks! John On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 (http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool safely ignored. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I recently discovered Hacker News and started reading through older posts about Scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick View this message in context: Why Scala? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Why Scala?
Thank you for the response. If it helps at all: I demoed the Spark platform for our data science team today. The idea of moving code from batch testing, to Machine Learning systems, GraphX, and then to near-real time models with streaming was cheered by the team as an efficiency they would love. That said, most folks, on our team are Python junkies, and they love that Spark seems to be committing to Python, and would REALLY love to see Python in Streaming, it would feel complete for them from a platform standpoint. It is still awesome using Scala, and many will learn that, but that full Python integration/support, if possible, would be a home run. On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: We are definitely investigating a Python API for Streaming, but no announced deadline at this point. Matei On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote: So Python is used in many of the Spark Ecosystem products, but not Streaming at this point. Is there a roadmap to include Python APIs in Spark Streaming? Anytime frame on this? Thanks! John On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 ( http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool safely ignored. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I recently discovered Hacker News and started reading through older posts about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick -- View this message in context: Why Scala? http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com http://nabble.com/.
Re: compile spark 1.0.0 error
if i compile spark with CDH4.6 and enable yarn support , it can run on CDH4.4? On Wed, Jun 4, 2014 at 5:59 PM, Sean Owen so...@cloudera.com wrote: I am not sure if it is exposed in the SBT build, but you may need the equivalent of the 'yarn-alpha' profile from the Maven build. This older build of CDH predates the newer YARN APIs. See also https://groups.google.com/forum/#!msg/spark-users/T1soH67C5M4/CmGYV8kfRkcJ Or, use a later CDH. In fact 4.6+ has a Spark parcel for you already. On Wed, Jun 4, 2014 at 10:13 AM, ch huang justlo...@gmail.com wrote: hi,maillist: i try to compile spark ,but failed, here is my compile command and compile output # SPARK_HADOOP_VERSION=2.0.0-cdh4.4.0 SPARK_YARN=true sbt/sbt assembly [warn] 18 warnings found [info] Compiling 53 Scala sources and 1 Java source to /home/admserver/spark-1.0.0/sql/catalyst/target/scala-2.10/classes... [info] Compiling 68 Scala sources and 2 Java sources to /home/admserver/spark-1.0.0/streaming/target/scala-2.10/classes... [info] Compiling 62 Scala sources and 1 Java source to /home/admserver/spark-1.0.0/mllib/target/scala-2.10/classes... [info] Compiling 14 Scala sources to /home/admserver/spark-1.0.0/yarn/alpha/target/scala-2.10/classes... [error] /home/admserver/spark-1.0.0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala:36: object AMResponse is not a member of package org.apache.hadoop.yarn.api.records [error] import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId} [error]^ [error] /home/admserver/spark-1.0.0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala:110: value getAMResponse is not a member of org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse [error] val amResp = allocateExecutorResources(executorsToRequest).getAMResponse [error]^ [error] two errors found [error] (yarn-alpha/compile:compile) Compilation failed [error] Total time: 1815 s, completed Jun 4, 2014 5:07:56 PM
Logistic Regression MLLib Slow
Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Re: is there any easier way to define a custom RDD in Java
i want to use spark to handle data from non-sql databases (RDF triple store, for example) however, i am not familiar with Scala so i want to know how to create a RdfTriplesRDD rapidly 2014-06-05 bluejoe2008 From: Patrick Wendell Date: 2014-06-05 01:25 To: user Subject: Re: is there any easier way to define a custom RDD in Java Hey There, This is only possible in Scala right now. However, this is almost never needed since the core API is fairly flexible. I have the same question as Andrew... what are you trying to do with your RDD? - Patrick On Wed, Jun 4, 2014 at 7:49 AM, Andrew Ash and...@andrewash.com wrote: Just curious, what do you want your custom RDD to do that the normal ones don't? On Wed, Jun 4, 2014 at 6:30 AM, bluejoe2008 bluejoe2...@gmail.com wrote: hi, folks, is there any easier way to define a custom RDD in Java? I am wondering if I have to define a new java class which extends RDD from scratch? It is really a hard job for developers! 2014-06-04 bluejoe2008
Using log4j.xml
Has anyone tried to use a log4j.xml instead of a log4j.properties with spark 0.9.1? I'm trying to run spark streaming on yarn and i've set the environment variable SPARK_LOG4J_CONF to a log4j.xml file instead of a log4j.properties file, but spark seems to be using the default log4j.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/var/hadoop/1/yarn/local/filecache/12/spark-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/05 00:36:04 INFO ApplicationMaster: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Thanks, Mike
Re: Logistic Regression MLLib Slow
Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Logistic Regression MLLib Slow
Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Why Scala?
I'm still a Spark newbie, but I have a heavy background in languages and compilers... so take this with a barrel of salt... Scala, to me, is the heart and soul of Spark. Couldn't work without it. Procedural languages like Python, Java, and all the rest are lovely when you have a couple of processors, but it doesn't scale. (pun intended) It's the same reason they had to invent a slew of 'Shader' languages for GPU programming. In fact, that's how I see Scala, as the CUDA or GLSL of cluster computing. Now, Scala isn't perfect. It could learn a thing or two from OCCAM about interprocess communication. (And from node.js about package management.) But functional programming becomes essential for highly-parallel code because the primary difference is that functional declares _what_ you want to do, and procedural declares _how_ you want to do it. Since you rarely know the shape of the cluster/graph ahead of time, functional programming becomes the superior paradigm, especially for the outermost parts of the program that interface with the scheduler. Python might be fine for the granular fragments, but you would have to export all those independent functions somehow, and define the scheduling and connective structure (the DAG) elsewhere, in yet another language or library. To fit neatly into GraphX, Python would probably have to be warped in the same way that GLSL is a stricter sub-set of C. You'd probably lose everything you like about the language, in order to make it seamless. I'm pretty agnostic about the whole Spark stack, and it's components, (eg: every time I run sbt/sbt assemble, Stuart Feldman dies a little inside and I get time to write another long email) but Scala is the one thing that gives it legs. I wish the rest of Spark was more like it. (ie: 'no ceremony') Scala might seem 'weird', but that's because it directly exposes parallelism, and the ways to cope with it. I've done enough distributed programming that the advantages are obvious, for that domain. You're not being asked to re-wire your thinking for Scala's benefit, but to solve the underlying problem. (But you are still being asked to turn your thinking sideways, I will admit.) People love Python because it 'fit' it's intended domain perfectly. That doesn't mean you'll love it just as much for embedded hardware, or GPU shader development, or Telecoms, or Spark. Then again, give me another week with the language, and see what I'm screaming about then ;-) On Thu, Jun 5, 2014 at 10:21 AM, John Omernik j...@omernik.com wrote: Thank you for the response. If it helps at all: I demoed the Spark platform for our data science team today. The idea of moving code from batch testing, to Machine Learning systems, GraphX, and then to near-real time models with streaming was cheered by the team as an efficiency they would love. That said, most folks, on our team are Python junkies, and they love that Spark seems to be committing to Python, and would REALLY love to see Python in Streaming, it would feel complete for them from a platform standpoint. It is still awesome using Scala, and many will learn that, but that full Python integration/support, if possible, would be a home run. On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: We are definitely investigating a Python API for Streaming, but no announced deadline at this point. Matei On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote: So Python is used in many of the Spark Ecosystem products, but not Streaming at this point. Is there a roadmap to include Python APIs in Spark Streaming? Anytime frame on this? Thanks! John On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 ( http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool
mismatched hdfs protocol
hi, all when my spark program accessed hdfs files an error happened: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 it seems the client was trying to connect hadoop2 via an old hadoop protocol so my question is: how to specify the version of hadoop on connection? thank you! bluejoe 2014-06-05
Re: Logistic Regression MLLib Slow
80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Logistic Regression MLLib Slow
Hi Krishna, Specifying executor memory in local mode has no effect, because all of the threads run inside the same JVM. You can either try --driver-memory 60g or start a standalone server. Best, Xiangrui On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote: 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Spark Usecase
Shahab, Interesting question. Couple of points (based on the information from your e-mail) 1. One can support the use case in Spark as a set of transformations on a WIP TDD over a span of time and the final transformation outputting to a processed TDD - Spark streaming would be a good data ingestion mechanism - look at the system as a pipeline that spans a time window - Depending on the cardinality, you would need a correlation id to transform the pipeline as you get more data 2. Having said that, you do have to understand what value spark provides, then design the topology to support that. - For example, you could potentially keep all the WIP in HBase the final transformations in Spark TDD. - Or may be you keep all the WIP in Spark and the final processed records in HBase. There is nothing wrong in keeping WIP in Spark, if response time to process the incoming data set is important. 3. Naturally start with a set of ideas, make a few assumptions and do an e2e POC. That will clear many of the questions and firm up the design. HTH. Cheers k/ On Wed, Jun 4, 2014 at 6:57 AM, Shahab Yunus shahab.yu...@gmail.com wrote: Hello All. I have a newbie question. We have a use case where huge amount of data will be coming in streams or micro-batches of streams and we want to process these streams according to some business logic. We don't have to provide extremely low latency guarantees but batch M/R will still be slow. Now the business logic is such that at the time of emitting the data, we might have to hold on to some tuples until we get more information. This 'more' information is essentially will be coming in streams of future streams. You can say that this is kind of *word count* use case where we have to *aggregate and maintain state across batches of streams.* One thing different here is that we might have to* maintain the state or data for a day or two* until rest of the data comes in and then we can complete our output. 1- Questions is that is such is use cases supported in Spark and/or Spark Streaming? 2- Will we be able to persist partially aggregated data until the rest of the information comes in later in time? I am mentioning *persistence* here that given that the delay can be spanned over a day or two we won't want to keep the partial data in memory for so long. I know this can be done in Storm but I am really interested in Spark because of its close integration with Hadoop. We might not even want to use Spark Streaming (which is more of a direct comparison with Storm/Trident) given our application does not have to be real-time in split-second. Feel free to direct me to any document or resource. Thanks a lot. Regards, Shahab
Re: Logistic Regression MLLib Slow
I will try both and get back to you soon! Thanks for all your help! Regards, Krishna On Wed, Jun 4, 2014 at 7:56 PM, Xiangrui Meng men...@gmail.com wrote: Hi Krishna, Specifying executor memory in local mode has no effect, because all of the threads run inside the same JVM. You can either try --driver-memory 60g or start a standalone server. Best, Xiangrui On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote: 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI ( http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Trouble launching EC2 Cluster with Spark
Hey Sam, You mentioned two problems here, did your VPC error message get fixed or only the key permissions problem? I noticed we had some report a similar issue with the VPC stuff a long time back (but there is no real resolution here): https://spark-project.atlassian.net/browse/SPARK-1166 If that's still an issue, one thing to try is just changing the name of the cluster. We create groups that are identified with the cluster name, and there might be something that just got screwed up with the original group creation and AWS isn't happy. - Patrick On Wed, Jun 4, 2014 at 12:55 PM, Sam Taylor Steyer sste...@stanford.edu wrote: Awesome, that worked. Thank you! - Original Message - From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org Sent: Wednesday, June 4, 2014 12:52:00 PM Subject: Re: Trouble launching EC2 Cluster with Spark chmod 600 path/FinalKey.pem Cheers k/ On Wed, Jun 4, 2014 at 12:49 PM, Sam Taylor Steyer sste...@stanford.edu wrote: Also, once my friend logged in to his cluster he received the error Permissions 0644 for 'FinalKey.pem' are too open. This sounds like the other problem described. How do we make the permissions more private? Thanks very much, Sam - Original Message - From: Sam Taylor Steyer sste...@stanford.edu To: user@spark.apache.org Sent: Wednesday, June 4, 2014 12:42:04 PM Subject: Re: Trouble launching EC2 Cluster with Spark Thanks you! The regions advice solved the problem for my friend who was getting the key pair does not exist problem. I am still getting the error: ERROR:boto:400 Bad Request ERROR:boto:?xml version=1.0 encoding=UTF-8? ResponseErrorsErrorCodeInvalidParameterValue/CodeMessageInvalid value 'null' for protocol. VPC security group rules must specify protocols explicitly./Message/Error/ErrorsRequestID7ff92687-b95a-4a39-94cb-e2d00a6928fd/RequestID/Response This sounds like it could have to do with the access settings of the security group, but I don't know how to change. Any advice would be much appreciated! Sam - Original Message - From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org Sent: Wednesday, June 4, 2014 8:52:59 AM Subject: Re: Trouble launching EC2 Cluster with Spark One reason could be that the keys are in a different region. Need to create the keys in us-east-1-North Virginia. Cheers k/ On Wed, Jun 4, 2014 at 7:45 AM, Sam Taylor Steyer sste...@stanford.edu wrote: Hi, I am trying to launch an EC2 cluster from spark using the following command: ./spark-ec2 -k HackerPair -i [path]/HackerPair.pem -s 2 launch HackerCluster I set my access key id and secret access key. I have been getting an error in the setting up security groups... phase: Invalid value 'null' for protocol. VPC security groups must specify protocols explicitly. My project partner gets one step further and then gets the error The key pair 'JamesAndSamTest' does not exist. Any thoughts as to how we could fix these problems? Thanks a lot! Sam