Re: parallelize for a large Seq is extreamly slow.

2014-04-29 Thread Earthson
I think the real problem is "spark.akka.frameSize". It is to small for
passing the data. every executor failed, and there is no executor, then the
task hangs up.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p5075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
It's my fault! I upload a wrong jar when I changed the number of partitions.
and Now it just works fine:)

The size of word_mapping is 2444185.

So it will take very long time for large object serialization? I don't think
two million is very large, because the cost at local for such size is
typically less than one second. 

Thanks for the help:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Matei Zaharia
How many values are in that sequence? I.e. what is its size?

You can also profile your program while it’s running to see where it’s spending 
time. The easiest way is to get a single stack trace with jstack . 
Maybe some of the serialization methods for this data are super inefficient, or 
toSeq on a map is inefficient. You could try word_mapping.value.toArray. I’m 
also wondering if something earlier in the program is slow and this is just not 
obvious from the output.

Matei

On Apr 27, 2014, at 9:47 AM, Earthson  wrote:

> That's not work. I don't think it is just slow, It never ends(with 30+ hours,
> and I killed it). 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
That's not work. I don't think it is just slow, It never ends(with 30+ hours,
and I killed it). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-26 Thread Aaron Davidson
Could it be that you're using the default number of partitions of
parallelize() is too small in this case? Try something like
spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it
should already be 30, but perhaps that's not the case in YARN mode...)


On Fri, Apr 25, 2014 at 11:38 PM, Earthson  wrote:

> parallelize is still so slow.
>
> 
>
> package com.semi.nlp
>
> import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyRegistrator extends KryoRegistrator {
> override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[Map[String,Int]])
> kryo.register(classOf[Map[String,Long]])
> kryo.register(classOf[Seq[(String,Long)]])
> kryo.register(classOf[Seq[(String,Int)]])
> }
> }
>
> object WFilter2 {
> def initspark(name:String) = {
> val conf = new SparkConf()
> .setMaster("yarn-standalone")
> .setAppName(name)
> .setSparkHome(System.getenv("SPARK_HOME"))
> .setJars(SparkContext.jarOfClass(this.getClass))
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> //.set("spark.closure.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryoserializer.buffer.mb", "256")
> .set("spark.kryo.registrator",
> "com.semi.nlp.MyRegistrator")
> .set("spark.cores.max", "30")
> new SparkContext(conf)
> }
>
> def main(args: Array[String]) {
> val spark = initspark("word filter mapping")
> val stopset = spark broadcast
>
> Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
> val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
> val tf_map = spark broadcast
> file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
> val df_map = spark broadcast
>
> file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
> val word_mapping = spark broadcast
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
> def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
> < 4 || (stopset.value contains w)) false else true
> val mapped =
>
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
>
>
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
> spark.stop()
> }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
parallelize is still so slow. 



package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
kryo.register(classOf[Map[String,Long]])
kryo.register(classOf[Seq[(String,Long)]])
kryo.register(classOf[Seq[(String,Int)]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster("yarn-standalone")
.setAppName(name)
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass))
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
//.set("spark.closure.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.mb", "256")
.set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
.set("spark.cores.max", "30")
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark("word filter mapping")
val stopset = spark broadcast
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
val df_map = spark broadcast
file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset.value contains w)) false else true
val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
reduceByKey(_+_).countByKey instead of countByKey seems to be fast.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
This error come just because I killed my App:(

Is there something wrong? the reduceByKey operation is extremely slow(than
default Serializer).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
nknown Source)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91)
... 10 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Earthson
Kryo With Exception below:

com.esotericsoftware.kryo.KryoException
(com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 1)
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)

~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster("yarn-standalone")
.setAppName(name)
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(SparkContext.jarOfClass(this.getClass))
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"com.semi.nlp.MyRegistrator")
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark("word filter mapping")
val stopset =
Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
val tf_map = spark broadcast
file.flatMap(_.split("\t")).map((_,1)).countByKey
val df_map = spark broadcast
file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
< 4 || (stopset contains w)) false else true
val mapped =
file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Matei Zaharia
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see 
http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably 
faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu  wrote:

> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> 
> this line is too slow. There are about 2 million elements in word_mapping.
> 
> Is there a good style for writing a large collection to hdfs?
> 
> import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> object WFilter {
> def main(args: Array[String]) {
> val spark = new SparkContext("yarn-standalone","word 
> filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
> val stopset = 
> Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
> val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
> val tf_map = spark broadcast 
> file.flatMap(_.split("\t")).map((_,1)).countByKey
> val df_map = spark broadcast 
> file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
> val word_mapping = spark broadcast 
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
> def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 
> 4 || (stopset contains w)) false else true
> val mapped = 
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
> 
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
> spark.stop()
> }
> } 
> 
> many thx:) 
> 
> -- 
> 
> ~
> Perfection is achieved 
> not when there is nothing more to add
>  but when there is nothing left to take away



Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Matei Zaharia
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see 
http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably 
faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu  wrote:

> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> 
> this line is too slow. There are about 2 million elements in word_mapping.
> 
> Is there a good style for writing a large collection to hdfs?
> 
> import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> object WFilter {
> def main(args: Array[String]) {
> val spark = new SparkContext("yarn-standalone","word 
> filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
> val stopset = 
> Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
> val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
> val tf_map = spark broadcast 
> file.flatMap(_.split("\t")).map((_,1)).countByKey
> val df_map = spark broadcast 
> file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
> val word_mapping = spark broadcast 
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
> def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w) < 
> 4 || (stopset contains w)) false else true
> val mapped = 
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
> 
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
> spark.stop()
> }
> } 
> 
> many thx:) 
> 
> -- 
> 
> ~
> Perfection is achieved 
> not when there is nothing more to add
>  but when there is nothing left to take away



parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Earthson Lu
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")

this line is too slow. There are about 2 million elements in word_mapping.

*Is there a good style for writing a large collection to hdfs?*

import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> object WFilter {
> def main(args: Array[String]) {
> val spark = new SparkContext("yarn-standalone","word
> filter",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
> val stopset =
> Source.fromURL(this.getClass.getResource("stoplist.txt")).getLines.map(_.trim).toSet
> val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
> val tf_map = spark broadcast
> file.flatMap(_.split("\t")).map((_,1)).countByKey
> val df_map = spark broadcast
> file.map(x=>Set(x.split("\t"):_*)).flatMap(_.map(_->1)).countByKey
> val word_mapping = spark broadcast
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
> def w_filter(w:String) = if (tf_map.value(w) < 8 ||
> df_map.value(w) < 4 || (stopset contains w)) false else true
> val mapped =
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
>
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
> mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
> spark.stop()
> }
> }


many thx:)

-- 

~
Perfection is achieved
not when there is nothing more to add
 but when there is nothing left to take away