On Thu, Mar 19, 2015 at 5:16 AM, sergunok <ser...@gmail.com> wrote: > Hi, > > I try to vectorize on yarn cluster corpus of texts (about 500K texts in 13 > files - 100GB totally) located in HDFS . > > This process already token about 20 hours on 3 node cluster with 6 cores, > 20GB RAM on each node.
> In my opinion it's to long :-) > > I started the task with the following command: > spark-submit --master yarn --num-executors 9 --executor-memory 5GB > --excutor-cores=2 --driver-memory 5GB weight.py only 5G * 2 * 6 * 0.6 = 36G will be used for cache, it's less than 100G (need more than that). so you can not have any benefit from cache(), just remove it. > weight.py: > from pyspark import SparkConf, SparkContext > from pyspark.mllib.feature import HashingTF > from pyspark.mllib.feature import IDF > from pyspark.mllib.feature import Normalizer > > conf = SparkConf() \ > .set("spark.hadoop.validateOutputSpecs", "false") \ > .set("spark.yarn.executor.memoryOverhead", "900") > sc = SparkContext(conf=conf) > > > # reading files from directory 'in/texts.txt' in HDFS > texts=sc.textFile('in/texts.txt') \ > .map(lambda line: line.split()) > > hashingTF = HashingTF() By default, you will have 2 millions features, do you really need that? Maybe 10k is enough for English. > tf = hashingTF.transform(texts) > > tf.cache() remove the cache > idf = IDF(minDocFreq=100).fit(tf) > tfidf = idf.transform(tf) > > n=Normalizer() > > normalized=n.transform(tfidf) > > def x2((vec, num)): > triples=[] > for id, weight in zip(vec.indices, vec.values): > triples.append((num, id, weight)) > return triples > > # I use zipWithIndex to enumerate documents > normalized.zipWithIndex() \ > .flatMap(x2) \ > .map(lambda t: '{}\t{}\t{}'.format(t[0],t[1],t[2])) \ > .saveAsTextFile('out/weights.txt') zipWithIndex() is expensive, it will trigger a job, you could use zipWIthUniqueID() (then id is not continuous) > 1) What could be a bottleneck? > Unfortunately I don't have access to the web UI. > In the log file I see stages: 0,1,2,3 > Stage 0 "MapPartitionsRDD[6] at mapPartitionsWithIndex at > RDDFunctions.scala:108" with 584 tasks completed very quick > Stage 1 "MappedRDD[8] at values at RDDFunctions.scala:110" (23 tasks) - > quick too > Stage 2 "zipWithIndex" (584 tasks) was long (17 hours) > Stage 3 "saveAsTextFile" (584 tasks) - too (still executing about 2 hours) > > I don't understand bounds of Stages 0,1.. > And don't understand why I I see numbers like 584 or 23 tasks on stages. > > > 2) On previous start of this task I saw a lot of "executor lost" errors of > yarn scheduler. Later I added .set("spark.yarn.executor.memoryOverhead", > "900") setting in code and now I see only a few such messages. Could it be > a reason of poor performance? > > Please advise! > > Any explainations appreciated! > > Serg. > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/calculating-TF-IDF-for-large-100GB-dataset-problems-tp22144.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org