Re: Execution error during ALS execution in spark
I have some suggestions you may try 1) input RDD ,use the persist method ,this may much save running time 2) from the UI,you can see cluster spend much time in shuffle stage , this can adjust through some conf parameters ,such as" spark.shuffle.memoryFraction" "spark.memory.fraction" good luck -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Execution-error-during-ALS-execution-in-spark-tp26644p26652.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
LDA code little error @Xiangrui Meng
Hi: there is a little error in source code LDA.scala at line 180, as follows: def setBeta(beta: Double): this.type = setBeta(beta) which cause java.lang.StackOverflowError. It's easy to see there is error -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LDA-code-little-error-Xiangrui-Meng-tp22621.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib /ALS : java.lang.OutOfMemoryError: Java heap space
I am not sure this can help you. I have 57 million rating,about 4million user and 4k items. I used 7-14 total-executor-cores,executal-memory 13g,cluster have 4 nodes,each have 4cores,max memory 16g. I found set as follows may help avoid this problem: conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2 conf.set(spark.storage.memoryFraction,0.3)//default is 0.6 I have to set rank value under 40, otherwise occure this problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-java-lang-OutOfMemoryError-Java-heap-space-tp20584p20755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
toArray,first get the different result from one element RDD
Hi Recently I have some problems about rdd behaviors.It's about RDD.first,RDD.toArray method when RDD only has one element. I get the different result in different method from one element RDD where i should have the same result. I will give more detail after the code. My code was as follows: //get and rdd with just one row ,RDD[(Long,Array[Byte])] val alsresult = sc.sequenceFile(args(0)+/als,classOf[LongWritable],classOf[BytesWritable]).map{case(uid,sessions)= sessions.setCapacity(sessions.getLength) (uid.get(),sessions.getBytes) }.filter{line= line._1 == userindex.value //specified from arguments } //log information really surprised me logger.info(alsInformation:%d.format(alsresult.count())) alsresult.toArray().foreach(e=logger.info(alstoarray:%d\t%s.format(e._1,e._2.mkString( alsresult.take(1).foreach(e=logger.info(take1result:%d\t%s.format(e._1,e._2.mkString( logger.info(firstInformation:%d\t%s.format(alsresult.first()._1,alsresult.first()._2.mkString( ))) alsresult.collect().foreach(e=logger.info(alscollectresult:%d\t%s.format(e._1,e._2.mkString( alsresult.take(3).foreach(e=logger.info(alstake3result:%d\t%s.format(e._1,e._2.mkString( //3 is big than the rdd.count() I get a RDD which just have one element. But use the different method ,I got the different element. My print information as follows: argument userindex 33057172 281168553814772 3209314 alsInformation 1 1 1 1 alstoarray 1612242 0 22 47 37 6 19... 3337442 16 32 0 22 13 49... 3697319 16 32 0 22 13 49... 3 0 22 47 37 6 19... take1result 1612242 21 24 3 56 16 27... 3337442 16 52 31 42 29 36 ... 3697319 39 21 34 56 3 37... 3 34 10 18 28 38 11... firstInformation1612242 21 24 3 56 16 27... 3337442 16 52 31 42 29 36 ... 3697319 39 21 34 56 3 37... 3 34 10 18 28 38 11... alscollectresult1612242 0 22 47 37 6 19... 3337442 16 32 0 22 13 49... 3697319 16 32 0 22 13 49... 3 0 22 47 37 6 19... alstake3result 1612242 0 22 47 37 6 19... 3337442 16 32 0 22 13 49... 3697319 16 32 0 22 13 49... 3 0 22 47 37 6 19... I filter the rdd and guarantee the RDD.count() equal 1.,I think different userindex.valuearguments should get different alsresult , but RDD.toArray,RDD.collect,RDD.take(3) ,have the same result and under the same argument toArray ,take(1),take(3) have the different resultmethod ,It's really surpurised me. The arguments is random specified. Can anyone explain it or give me some reference? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/toArray-first-get-the-different-result-from-one-element-RDD-tp20734.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: toArray,first get the different result from one element RDD
I get the key point . The problem is in sc.sequenceFile,From API description RDD will create many references to the same objecty ,So I revise the code sessions.getBytes to sessions.getBytes.clone, It seems to work. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/toArray-first-get-the-different-result-from-one-element-RDD-tp20734p20739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with processing multiple RDDs
i think you can try to set lower spark.storage.memoryFraction,for example 0.4 conf.set(spark.storage.memoryFraction,0.4) //default 0.6 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-processing-multiple-RDDs-tp18628p18659.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
index File create by mapFile can't
Hi Recently I want to save a big RDD[(k,v)] in form of index and data ,I deceide to use hadoop mapFile. I tried some examples like this :https://gist.github.com/airawat/6538748 I runs the code well and generate a index and data file. I can use command hadoop fs -text /spark/out2/mapFile/data to open the file .But when I run command :hadoop fs -text /spark/out2/mapFile/index ,I can't see the index content .there are only some informations in console : 14/11/10 16:11:04 INFO zlib.ZlibFactory: Successfully loaded initialized native-zlib library 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] and commond :hadoop fs -ls /spark/out2/mapFile/ shows follows -rw-r--r-- 3 spark hdfs 24002 2014-11-10 15:19 /spark/out2/mapFile/data -rw-r--r-- 3 spark hdfs136 2014-11-10 15:19 /spark/out2/mapFile/index I think INFO compress.CodecPool: Got brand-new decompressor [.deflate] should't prohibit show the data in index. It'e really confused me. My code was as follows: def try_Map_File(writePath:String) = { val uri = writePath+/mapFile val data=Array( One, two, buckle my shoe,Three, four, shut the door,Five, six, pick up sticks, Seven, eight, lay them straight,Nine, ten, a big fat hen) val con = new SparkConf() con.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec) val sc= new SparkContext(con) val conf = sc.hadoopConfiguration val fs = FileSystem.get(URI.create(uri),conf) val key = new IntWritable() val value = new Text() var writer:MapFile.Writer = null try{ val writer = new Writer(conf,fs,uri,key.getClass,value.getClass) writer.setIndexInterval(64) for(i- Range(0,512)){ key.set(i+1) value.set(data(i%data.length)) writer.append(key,value) } }finally { IOUtils.closeStream(writer) } } can anyone give me some idea or other method to instead mapFile? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/index-File-create-by-mapFile-can-t-tp18469.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
index File create by mapFile can't read
Hi Recently i want to save a big RDD[(k,v)] in form of index and data ,I deceide to use hadoop mapFile. I tried some examples like this :https://gist.github.com/airawat/6538748 I runs the code well and generate a index and data file. I can use command hadoop fs -text /spark/out2/mapFile/data to open the file .But when I run command :hadoop fs -text /spark/out2/mapFile/index ,I can't see the index content .there are only some informations in console : 14/11/10 16:11:04 INFO zlib.ZlibFactory: Successfully loaded initialized native-zlib library 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 14/11/10 16:11:04 INFO compress.CodecPool: Got brand-new decompressor [.deflate] and commond :hadoop fs -ls /spark/out2/mapFile/ shows follows -rw-r--r-- 3 spark hdfs 24002 2014-11-10 15:19 /spark/out2/mapFile/data -rw-r--r-- 3 spark hdfs136 2014-11-10 15:19 /spark/out2/mapFile/index I think INFO compress.CodecPool: Got brand-new decompressor [.deflate] should't prohibit show the data in index. It'e really confused me. My code was as follows: def try_Map_File(writePath:String) = { val uri = writePath+/mapFile val data=Array( One, two, buckle my shoe,Three, four, shut the door,Five, six, pick up sticks, Seven, eight, lay them straight,Nine, ten, a big fat hen) val con = new SparkConf() con.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec) val sc= new SparkContext(con) val conf = sc.hadoopConfiguration val fs = FileSystem.get(URI.create(uri),conf) val key = new IntWritable() val value = new Text() var writer:MapFile.Writer = null try{ val writer = new Writer(conf,fs,uri,key.getClass,value.getClass) writer.setIndexInterval(64) for(i- Range(0,512)){ key.set(i+1) value.set(data(i%data.length)) writer.append(key,value) } }finally { IOUtils.closeStream(writer) } } can anyone give me some idea or other method to instead mapFile? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/index-File-create-by-mapFile-can-t-read-tp18471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: To generate IndexedRowMatrix from an RowMatrix
You should supply more information about your input data. For example ,I generate a IndexRowMatrix from ALS algorithm input data format,my code like this: val inputData = sc.textFile(fname).map{ line= val parts = line.trim.split(' ') (parts(0).toLong,parts(1).toInt,parts(2).toDouble) } val ncol = inputData.map(_._2).max()+1 val nrows = inputData.map(_._1).max()+1 logInfo(srows:$nrows,columns:$ncol) val dataRows = inputData.groupBy(_._1).map[IndexedRow]{ row = val (indices, values) = row._2.map(e = (e._2, e._3)).unzip new IndexedRow(row._1, new SparseVector(ncol, indices.toArray, values.toArray)) } val svd = new IndexedRowMatrix(dataRows.persist(),nrows,ncol).computeSVD(rank,computeU = true) If your input data has no index information,I think you should care about the sort of rows in your RowMatrix, your matrix multiply should not rely on assumption rowmatrix ordered. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/To-generate-IndexedRowMatrix-from-an-RowMatrix-tp18490p18541.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to avoid use snappy compression when saveAsSequenceFile?
Hi: After update spark to version1.1.0, I experienced a snappy error which was posted here http://apache-spark-user-list.1001560.n3.nabble.com/Update-gcc-version-Still-snappy-error-tt15137.html . I avoid this problem with code:conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec).I run the als and svd algorithm with a huge 5 500 000 *5 000 sparse matrix ,I want to save some result in binary format which can save space. Then I found function 'saveAsSequenceFile' occured the same problem,But I don't know how two avoid this problem again. There is some problem with my environment ,I just can't solve it out. can any one give me some idea about this problem or to void use snappy when saveAsSequenceFile? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-use-snappy-compression-when-saveAsSequenceFile-tp17350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to avoid use snappy compression when saveAsSequenceFile?
Here is error log,I abstract as follows: INFO [binaryTest---main]: before first WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-0]: Lost task 0.0 in stage 0.0 (TID 0, spark-dev136): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236) org.xerial.snappy.Snappy.clinit(Snappy.java:48) org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2288) WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-1]: Lost task 0.1 in stage 0.0 (TID 2, spark-dev136): java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2288) java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2301) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2772) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:778) java.io.ObjectInputStream.init(ObjectInputStream.java:278) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) ERROR [org.apache.spark.scheduler.TaskSchedulerImpl---sparkDriver-akka.actor.default-dispatcher-17]: Lost executor 1 on spark-dev136: remote Akka client disassociated Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, spark-dev134): java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2288) that's the error log in console. My test code as follows,I can run this correctly on my notebook. I know there is something wrong with the spark cluster,I want to avoid use snappy compression which can avoid this problem . val conf = new SparkConf().setAppName(binary) conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec) val sc = new SparkContext() val arr = Array(One, two, buckle my shoe, Three, four, shut the door, Five, six, pick up sticks, Seven, eight, lay them straight, Nine, ten, a big fat hen) val pairs = arr.indices zip arr //implicit def int2IntWritable(fint:Int):IntWritable =new IntWritable() //implicit def string2Writable(fstring:String):Text = new Text() val rdd = sc.makeRDD(pairs) logInfo(before first) println(rdd.first()) logInfo(after first) val seq = new SequenceFileRDDFunctions(rdd) seq.saveAsSequenceFile(args(0)) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-use-snappy-compression-when-saveAsSequenceFile-tp17350p17424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
The confusion order of rows in SVD matrix ?
Hi: I want to use SVD in my work. I tried some examples and have some confusions. The input the 4*3 matrix as follows: 2 0 0 0 3 2 0 3 1 2 0 3 My input file text as follows which is corresponding to the matrix 0 0 2 1 1 3 1 2 2 2 2 1 2 1 3 3 0 2 3 2 3 After run the svd algorithm ,I tried to reCompute the input matrix through U*S*V.T.But I found that ,The input matrix's row is not as expected,I printed it out: 2 0 0 0 3 1 0 3 2 2 0 3 which rows 2 exchanged with rows 3,I confused on this.Can any one explain? My code is that : val inputData = sc.textFile(fname).map{ line= val parts = line.trim.split(' ') (parts(0).toLong,parts(1).toInt,parts(2).toDouble) } val dataRows = inputData.groupBy(_._1).map[(Long, Vector)]{ row = val (indices, values) = row._2.map(e = (e._2, e._3)).unzip (row._1, new SparseVector(ncol, indices.toArray, values.toArray)) } val data = dataRows.take(dataRows.count().toInt).map(e=e._1+e._2.toArray.mkString(;)) logInfo() logInfo(data(0)) logInfo(data(1)) logInfo(data(2)) logInfo(data(3)) Here is the print code ,and this matrix is the same as the reCompute matrix U*S*V.T. But the order of rows has changed which I can't understand.I want to have the same matrix as the input file. How to guarantee this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-confusion-order-of-rows-in-SVD-matrix-tp15337.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Update gcc version ,Still snappy error.
I update the spark version form 1.02 to 1.10 , experienced an snappy version issue with the new Spark-1.1.0. After update the glibc version, occured a another issue. I abstract the log as follows: 14/09/25 11:29:18 WARN [org.apache.hadoop.util.NativeCodeLoader---main]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/25 11:29:19 WARN [org.apache.hadoop.hdfs.DomainSocketFactory---main]: The short-circuit local reads feature is disabled because libhadoop cannot be loaded. WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-0]: Lost task 0.0 in stage 1.0 (TID 1, spark-dev134): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236) org.xerial.snappy.Snappy.clinit(Snappy.java:48) org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-3]: Lost task 4.0 in stage 1.0 (TID 4, spark-dev134): java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) 14/09/25 11:29:24 ERROR [org.apache.spark.network.ConnectionManager---handle-read-write-executor-3]: Corresponding SendingConnection to ConnectionManagerId(spark-dev135,38649) not found 14/09/25 11:29:24 INFO [org.apache.spark.scheduler.DAGScheduler---main]: Failed to run count at SessionSVD2.scala:23 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 9, spark-dev135): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 1)I tried to print JAVA_LIBRARY_PATH ,native-hadoop library is not in the path,and I set the System.setProperty(JAVA_LIBRARY_PATH,hadoop_home/lib/native/),which only effect in System.getenv(),but not the System.getProperty(JAVA_LIBRARY_PATH) .And hadoop_home/lib/native/ contain libhadoop libsnappy.so file,whcih I want to include in path. 2)I found in /tmp there are many snappy-uuuid file,each time i submit a job it create a snappy-uuuid file. Before I update the glibc version,my fellow update the snappy version,I think this is the reason why it can find the snappy file but libhadoop. Is there any ideas? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Update-gcc-version-Still-snappy-error-tp15137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org