Re: Processing of text file in large gzip archive
1. I don't think textFile is capable of unpacking a .gz file. You need to use hadoopFile or newAPIHadoop file for this. Sorry that’s incorrect, textFile works fine on .gz files. What it can’t do is compute splits on gz files, so if you have a single file, you'll have a single partition. Processing 30 GB of gzipped data should not take that long, at least with the Scala API. Python not sure, especially under 1.2.1.
Re: Processing of text file in large gzip archive
You probably want to update this line as follows: lines = sc.textFile('file.gz').repartition(sc.defaultParallelism * 3) For more details on why, see this answer http://stackoverflow.com/a/27631722/877069. Nick On Mon, Mar 16, 2015 at 6:50 AM Marius Soutier mps@gmail.com wrote: 1. I don't think textFile is capable of unpacking a .gz file. You need to use hadoopFile or newAPIHadoop file for this. Sorry that’s incorrect, textFile works fine on .gz files. What it can’t do is compute splits on gz files, so if you have a single file, you'll have a single partition. Processing 30 GB of gzipped data should not take that long, at least with the Scala API. Python not sure, especially under 1.2.1.
Processing of text file in large gzip archive
I have a 30GB gzip file (originally that is text file where each line represents text document) in HDFS and Spark 1.2.0 under YARN cluster with 3 worker nodes with 64GB RAM and 4 cores on each node. Replictaion factor for my file is 3. I tried to implement simple pyspark script to parse this file and represent it in tf-idf: Something like: lines=sc.textFile('file.gz') docs=lines.map(lambda: line.split(' ')) hashingTF=HashingTF() tf=hashingTF.transform(docs) tf.cache() idf=IDF().fit(tf) tfidf=idf.transform(tf) tfidf.map(lambda t: ' '.join([u'{}:{}'.format(t[0], t[1]) for t in zip(t.indices, t.values)])) \ .saveAsTextFile('tfidf.txt') I started the scipt with: spark-submit --master yarn --num-executors 24 script.py No comments about why I selected 24 executors - that is just first try. I saw in the output that all 24 executors and corresponding blockmanagers with 0.5 GB on each of them were started on 3 nodes but output stops on messages: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node3:36765 (size: 49.7 KB, free: 530.0 MB) INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node3:36765 (size: 21.6 KB, free: 529.9 MB) I'm waiting already about 1 hour and don't see any changes. (Unfortunately I cannot monitor the cluster via Web UI) My main question is it generally speaking normal time of processing for such volume of data and such cluster? Is it ok that output stops on Added broadcast...? Is it ok to read gzip archive via sc.textFile(..) in code or is it better to unpack it before (from performance purposes)? How to monitor Spark task via command line? Please advise about some tuning. Thanks! Sergey. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-of-text-file-in-large-gzip-archive-tp22073.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing of text file in large gzip archive
1. I don't think textFile is capable of unpacking a .gz file. You need to use hadoopFile or newAPIHadoop file for this. 2. Instead of map, do a mapPartitions 3. You need to open the driver UI and see what's really taking time. If that is running on a remote machine and you are not able to access from local, then create a ssh tunnel (ssh -L 4040:127.0.0.1:4040 user@remotemachine). Thanks Best Regards On Mon, Mar 16, 2015 at 1:39 PM, sergunok ser...@gmail.com wrote: I have a 30GB gzip file (originally that is text file where each line represents text document) in HDFS and Spark 1.2.0 under YARN cluster with 3 worker nodes with 64GB RAM and 4 cores on each node. Replictaion factor for my file is 3. I tried to implement simple pyspark script to parse this file and represent it in tf-idf: Something like: lines=sc.textFile('file.gz') docs=lines.map(lambda: line.split(' ')) hashingTF=HashingTF() tf=hashingTF.transform(docs) tf.cache() idf=IDF().fit(tf) tfidf=idf.transform(tf) tfidf.map(lambda t: ' '.join([u'{}:{}'.format(t[0], t[1]) for t in zip(t.indices, t.values)])) \ .saveAsTextFile('tfidf.txt') I started the scipt with: spark-submit --master yarn --num-executors 24 script.py No comments about why I selected 24 executors - that is just first try. I saw in the output that all 24 executors and corresponding blockmanagers with 0.5 GB on each of them were started on 3 nodes but output stops on messages: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node3:36765 (size: 49.7 KB, free: 530.0 MB) INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node3:36765 (size: 21.6 KB, free: 529.9 MB) I'm waiting already about 1 hour and don't see any changes. (Unfortunately I cannot monitor the cluster via Web UI) My main question is it generally speaking normal time of processing for such volume of data and such cluster? Is it ok that output stops on Added broadcast...? Is it ok to read gzip archive via sc.textFile(..) in code or is it better to unpack it before (from performance purposes)? How to monitor Spark task via command line? Please advise about some tuning. Thanks! Sergey. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-of-text-file-in-large-gzip-archive-tp22073.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org