Could it be that you're using the default number of partitions of parallelize() is too small in this case? Try something like spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it should already be 30, but perhaps that's not the case in YARN mode...)
On Fri, Apr 25, 2014 at 11:38 PM, Earthson <earthson...@gmail.com> wrote: > parallelize is still so slow. > > ~~~~~~~~ > > package com.semi.nlp > > import org.apache.spark._ > import SparkContext._ > import scala.io.Source > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.serializer.KryoRegistrator > > class MyRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register(classOf[Map[String,Int]]) > kryo.register(classOf[Map[String,Long]]) > kryo.register(classOf[Seq[(String,Long)]]) > kryo.register(classOf[Seq[(String,Int)]]) > } > } > > object WFilter2 { > def initspark(name:String) = { > val conf = new SparkConf() > .setMaster("yarn-standalone") > .setAppName(name) > .setSparkHome(System.getenv("SPARK_HOME")) > .setJars(SparkContext.jarOfClass(this.getClass)) > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > //.set("spark.closure.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryoserializer.buffer.mb", "256") > .set("spark.kryo.registrator", > "com.semi.nlp.MyRegistrator") > .set("spark.cores.max", "30") > new SparkContext(conf) > } > > def main(args: Array[String]) { > val spark = initspark("word filter mapping") > val stopset = spark broadcast > > Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet > val file = spark.textFile("hdfs://ns1/nlp/wiki.splited") > val tf_map = spark broadcast > file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey > val df_map = spark broadcast > > file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey > val word_mapping = spark broadcast > Map(df_map.value.keys.zipWithIndex.toBuffer:_*) > def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) > < 4 || (stopset.value contains w)) false else true > val mapped = > > file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t")) > > > spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping") > mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs") > spark.stop() > } > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >