Re: parallelize for a large Seq is extreamly slow.
I think the real problem is "spark.akka.frameSize". It is to small for passing the data. every executor failed, and there is no executor, then the task hangs up. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p5075.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
It's my fault! I upload a wrong jar when I changed the number of partitions. and Now it just works fine:) The size of word_mapping is 2444185. So it will take very long time for large object serialization? I don't think two million is very large, because the cost at local for such size is typically less than one second. Thanks for the help:) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4914.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
How many values are in that sequence? I.e. what is its size? You can also profile your program while it’s running to see where it’s spending time. The easiest way is to get a single stack trace with jstack . Maybe some of the serialization methods for this data are super inefficient, or toSeq on a map is inefficient. You could try word_mapping.value.toArray. I’m also wondering if something earlier in the program is slow and this is just not obvious from the output. Matei On Apr 27, 2014, at 9:47 AM, Earthson wrote: > That's not work. I don't think it is just slow, It never ends(with 30+ hours, > and I killed it). > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
That's not work. I don't think it is just slow, It never ends(with 30+ hours, and I killed it). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
Could it be that you're using the default number of partitions of parallelize() is too small in this case? Try something like spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it should already be 30, but perhaps that's not the case in YARN mode...) On Fri, Apr 25, 2014 at 11:38 PM, Earthson wrote: > parallelize is still so slow. > > > > package com.semi.nlp > > import org.apache.spark._ > import SparkContext._ > import scala.io.Source > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.serializer.KryoRegistrator > > class MyRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register(classOf[Map[String,Int]]) > kryo.register(classOf[Map[String,Long]]) > kryo.register(classOf[Seq[(String,Long)]]) > kryo.register(classOf[Seq[(String,Int)]]) > } > } > > object WFilter2 { > def initspark(name:String) = { > val conf = new SparkConf() > .setMaster("yarn-standalone") > .setAppName(name) > .setSparkHome(System.getenv("SPARK_HOME")) > .setJars(SparkContext.jarOfClass(this.getClass)) > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > //.set("spark.closure.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryoserializer.buffer.mb", "256") > .set("spark.kryo.registrator", > "com.semi.nlp.MyRegistrator") > .set("spark.cores.max", "30") > new SparkContext(conf) > } > > def main(args: Array[String]) { > val spark = initspark("word filter mapping") > val stopset = spark broadcast > > Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet > val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") > val tf_map = spark broadcast > file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey > val df_map = spark broadcast > > file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey > val word_mapping = spark broadcast > Map(df_map.value.keys.zipWithIndex.toBuffer:_*) > def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) > < 4 || (stopset.value contains w)) false else true > val mapped = > > file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) > > > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") > spark.stop() > } > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
Re: parallelize for a large Seq is extreamly slow.
parallelize is still so slow. package com.semi.nlp import org.apache.spark._ import SparkContext._ import scala.io.Source import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Map[String,Int]]) kryo.register(classOf[Map[String,Long]]) kryo.register(classOf[Seq[(String,Long)]]) kryo.register(classOf[Seq[(String,Int)]]) } } object WFilter2 { def initspark(name:String) = { val conf = new SparkConf() .setMaster("yarn-standalone") .setAppName(name) .setSparkHome(System.getenv("SPARK_HOME")) .setJars(SparkContext.jarOfClass(this.getClass)) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //.set("spark.closure.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryoserializer.buffer.mb", "256") .set("spark.kryo.registrator", "com.semi.nlp.MyRegistrator") .set("spark.cores.max", "30") new SparkContext(conf) } def main(args: Array[String]) { val spark = initspark("word filter mapping") val stopset = spark broadcast Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey val df_map = spark broadcast file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset.value contains w)) false else true val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") spark.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
reduceByKey(_+_).countByKey instead of countByKey seems to be fast. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4870.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
This error come just because I killed my App:( Is there something wrong? the reduceByKey operation is extremely slow(than default Serializer). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4869.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
nknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91) ... 10 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4868.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
Kryo With Exception below: com.esotericsoftware.kryo.KryoException (com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1) com.esotericsoftware.kryo.io.Output.require(Output.java:138) com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79) com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21) com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124) org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223) org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) ~~~ package com.semi.nlp import org.apache.spark._ import SparkContext._ import scala.io.Source import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Map[String,Int]]) } } object WFilter2 { def initspark(name:String) = { val conf = new SparkConf() .setMaster("yarn-standalone") .setAppName(name) .setSparkHome(System.getenv("SPARK_HOME")) .setJars(SparkContext.jarOfClass(this.getClass)) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.semi.nlp.MyRegistrator") new SparkContext(conf) } def main(args: Array[String]) { val spark = initspark("word filter mapping") val stopset = Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") val tf_map = spark broadcast file.flatMap(_.split("\t")).map((_,1)).countByKey val df_map = spark broadcast file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 4 || (stopset contains w)) false else true val mapped = file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") spark.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably faster. Matei On Apr 24, 2014, at 8:01 PM, Earthson Lu wrote: > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > > this line is too slow. There are about 2 million elements in word_mapping. > > Is there a good style for writing a large collection to hdfs? > > import org.apache.spark._ > import SparkContext._ > import scala.io.Source > object WFilter { > def main(args: Array[String]) { > val spark = new SparkContext("yarn-standalone","word > filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) > val stopset = > Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet > val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") > val tf_map = spark broadcast > file.flatMap(_.split("\t")).map((_,1)).countByKey > val df_map = spark broadcast > file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey > val word_mapping = spark broadcast > Map(df_map.value.keys.zipWithIndex.toBuffer:_*) > def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < > 4 || (stopset contains w)) false else true > val mapped = > file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) > > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") > spark.stop() > } > } > > many thx:) > > -- > > ~ > Perfection is achieved > not when there is nothing more to add > but when there is nothing left to take away
Re: parallelize for a large Seq is extreamly slow.
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably faster. Matei On Apr 24, 2014, at 8:01 PM, Earthson Lu wrote: > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > > this line is too slow. There are about 2 million elements in word_mapping. > > Is there a good style for writing a large collection to hdfs? > > import org.apache.spark._ > import SparkContext._ > import scala.io.Source > object WFilter { > def main(args: Array[String]) { > val spark = new SparkContext("yarn-standalone","word > filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) > val stopset = > Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet > val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") > val tf_map = spark broadcast > file.flatMap(_.split("\t")).map((_,1)).countByKey > val df_map = spark broadcast > file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey > val word_mapping = spark broadcast > Map(df_map.value.keys.zipWithIndex.toBuffer:_*) > def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < > 4 || (stopset contains w)) false else true > val mapped = > file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) > > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") > spark.stop() > } > } > > many thx:) > > -- > > ~ > Perfection is achieved > not when there is nothing more to add > but when there is nothing left to take away
parallelize for a large Seq is extreamly slow.
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") this line is too slow. There are about 2 million elements in word_mapping. *Is there a good style for writing a large collection to hdfs?* import org.apache.spark._ > import SparkContext._ > import scala.io.Source > object WFilter { > def main(args: Array[String]) { > val spark = new SparkContext("yarn-standalone","word > filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) > val stopset = > Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet > val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") > val tf_map = spark broadcast > file.flatMap(_.split("\t")).map((_,1)).countByKey > val df_map = spark broadcast > file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey > val word_mapping = spark broadcast > Map(df_map.value.keys.zipWithIndex.toBuffer:_*) > def w_filter(w:String) = if (tf_map.value(w) < 8 || > df_map.value(w) < 4 || (stopset contains w)) false else true > val mapped = > file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) > > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") > spark.stop() > } > } many thx:) -- ~ Perfection is achieved not when there is nothing more to add but when there is nothing left to take away