回复:Re: Re: Re: Re: how to distributed run a bash shell in spark
Hi Akhil and all My previous code has some problems,all the executors are looping and running the same command. That's not what I am expecting.previous code: val shellcompare = List(run,sort.sh) val shellcompareRDD = sc.makeRDD(shellcompare) val result = List(aggregate,result) val resultRDD = sc.makeRDD(result) for(j - 1 to 21){ resultRDD.pipe(sh /opt/sh/bin/sort.sh /opt/data/shellcompare/chr + j + .txt /opt/data/shellcompare/samplechr + j + .txt /opt/data/shellcompare/result + j + .txt 600).collect() What i want is all the executor running different commands concurrently. So I modified the code like below:val resultRDD = sc.parallelize(Array(1, 2, 3, 4, 5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)) resultRDD.map{i=resultRDD.pipe(sh /opt/sh/bin/sort.sh /opt/data/shellcompare/chr + i + .txt /opt/data/shellcompare/samplechr + i + .txt /opt/data/shellcompare/result + i + .txt 600) }.collect() however I got an exception below: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 57, slave1): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x = rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. any advices on this? Thanks. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: Re: Re: how to distributed run a bash shell in spark 日期:2015年05月26日 14点46分 If you open up the driver UI (running on 4040), you can see multiple tasks per stage which will be happening concurrently. If it is a single task, and you want to increase the parallelism, then you can simply do a re-partition.ThanksBest Regards On Tue, May 26, 2015 at 8:27 AM, luohui20...@sina.com wrote: I am right trying to run some shell script in my spark app, hoping it runs more concurrently in my spark cluster.However I am not sure whether my codes will run concurrently in my executors.Dive into my code, you can see that I am trying to 1.splite both db and sample into 21 small files. That will generate total 42 files. By spliting db I will get chr1 ,chr2,...chr21, and spliting sample I will get samplechr1,samplechr2,...samplechr21.2.merge those splited files from hdfs and save to local path.Those merged file will be chr1.txt ,chr2.txt,...chr21.txt and samplechr1.txt,samplechr2.txt,...samplechr21.txt3.run modify.sh to clean the data, that means to delete some charactors not useful. 4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a result1.txt. And looping it from 1 to 21 so that those 42 file are compared and I can get 21 files like result1.txt,result2.txt...result21.txt. Sorry for not adding some comments for my code. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: Re: how to distributed run a bash shell in spark 日期:2015年05月25日 22点41分 Can you can tell us what exactly you are trying to achieve?ThanksBest Regards On Mon, May 25, 2015 at 5:00 PM, luohui20...@sina.com wrote: thanks, madhu and Akhil I modified my code like below,however I think it is not so distributed. Have you guys better idea to run this app more efficiantly and distributed? So I add some comments with my understanding: import org.apache.spark._ import www.celloud.com.model._ object GeneCompare3 { def main(args: Array[String]) { val conf = new SparkConf().setAppName(GeneCompare).setMaster(spark://master:7077).set(spark.executor.memory, 6g).set(hive.metastore.warehouse.dir, /user/hive/warehouse) val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val db = sc.textFile(/data/db.txt).map(_.split(\t)).map(x = Database(x(0), x(1).trim().toInt, x(2).trim().toInt, x(3).trim().toInt, x(4).trim().toInt)).toDF() val sample = sc.textFile(/data/sample.txt).map(_.split(\t)).map(x = Sample(x(0), x(1).trim().toInt, x(2), x(3).trim().toInt, x(4).trim().toInt, x(5))).toDF() //using Akhil Das's idea,1to21 is a file with 21lines,each line is a single number //val test = sc.textfile(1to21.txt).foreach{i = //running on driver manager // db.filter(chrname = 'chr + i + ').rdd.saveAsTextFile(/data/chr + i) //running on driver executor // db.rdd.pipe(hadoop fs -getmerge /data/chr + i + /opt/data/shellcompare/chr + i + .txt) //
回复:Re: Re: Re: Re: how to distributed run a bash shell in spark
Thanks Akhil, I checked the job UI again ,my app is running concurrently in all the executors. But some of the tasks got I/O exception. I will continue inspecting on this. java.io.IOException: Failed to create local dir in /tmp/spark-b66c7c95-9242-4454-b900-9be9301c4c26/spark-e4c3e926-4bae-4893-85cb-635984d2ede7/spark-9bf34589-cea4-4736-a573-5347d983c134/blockmgr-0c82aca2-eaf1-43ae-8c55-cc4c1777e8f1/1b. Thank you. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: Re: Re: how to distributed run a bash shell in spark 日期:2015年05月26日 14点46分 If you open up the driver UI (running on 4040), you can see multiple tasks per stage which will be happening concurrently. If it is a single task, and you want to increase the parallelism, then you can simply do a re-partition.ThanksBest Regards On Tue, May 26, 2015 at 8:27 AM, luohui20...@sina.com wrote: I am right trying to run some shell script in my spark app, hoping it runs more concurrently in my spark cluster.However I am not sure whether my codes will run concurrently in my executors.Dive into my code, you can see that I am trying to 1.splite both db and sample into 21 small files. That will generate total 42 files. By spliting db I will get chr1 ,chr2,...chr21, and spliting sample I will get samplechr1,samplechr2,...samplechr21.2.merge those splited files from hdfs and save to local path.Those merged file will be chr1.txt ,chr2.txt,...chr21.txt and samplechr1.txt,samplechr2.txt,...samplechr21.txt3.run modify.sh to clean the data, that means to delete some charactors not useful. 4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a result1.txt. And looping it from 1 to 21 so that those 42 file are compared and I can get 21 files like result1.txt,result2.txt...result21.txt. Sorry for not adding some comments for my code. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: Re: Re: how to distributed run a bash shell in spark 日期:2015年05月25日 22点41分 Can you can tell us what exactly you are trying to achieve?ThanksBest Regards On Mon, May 25, 2015 at 5:00 PM, luohui20...@sina.com wrote: thanks, madhu and Akhil I modified my code like below,however I think it is not so distributed. Have you guys better idea to run this app more efficiantly and distributed? So I add some comments with my understanding: import org.apache.spark._ import www.celloud.com.model._ object GeneCompare3 { def main(args: Array[String]) { val conf = new SparkConf().setAppName(GeneCompare).setMaster(spark://master:7077).set(spark.executor.memory, 6g).set(hive.metastore.warehouse.dir, /user/hive/warehouse) val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val db = sc.textFile(/data/db.txt).map(_.split(\t)).map(x = Database(x(0), x(1).trim().toInt, x(2).trim().toInt, x(3).trim().toInt, x(4).trim().toInt)).toDF() val sample = sc.textFile(/data/sample.txt).map(_.split(\t)).map(x = Sample(x(0), x(1).trim().toInt, x(2), x(3).trim().toInt, x(4).trim().toInt, x(5))).toDF() //using Akhil Das's idea,1to21 is a file with 21lines,each line is a single number //val test = sc.textfile(1to21.txt).foreach{i = //running on driver manager // db.filter(chrname = 'chr + i + ').rdd.saveAsTextFile(/data/chr + i) //running on driver executor // db.rdd.pipe(hadoop fs -getmerge /data/chr + i + /opt/data/shellcompare/chr + i + .txt) // sample.filter(name = 'chr + i + ').rdd.saveAsTextFile(/data/samplechr + i) // db.rdd.pipe(hadoop fs -getmerge /data/samplechr + i + /opt/data/shellcompare/samplechr + i + .txt) //running on driver executor //}.collect() //running on driver manager //using madhu's method //running on driver managerfor (i - 1 to 21) { db.filter(chrname = 'chr + i + ').rdd.saveAsTextFile(/data/chr + i) db.rdd.pipe(hadoop fs -getmerge /data/chr + i + /opt/data/shellcompare/chr + i + .txt).collect() sample.filter(name = 'chr + i + ').rdd.saveAsTextFile(/data/samplechr + i) db.rdd.pipe(hadoop fs -getmerge /data/samplechr + i + /opt/data/shellcompare/samplechr + i + .txt).collect() } //running on driver manager val runmodifyshell=List(run,modify.sh) val runmodifyshellRDD = sc.makeRDD(runmodifyshell) val pipeModify = runmodifyshellRDD.pipe(sh /opt/data/shellcompare/modify.sh) pipeModify.collect() //running on driver manager val shellcompare = List(run,sort.sh) val shellcompareRDD = sc.makeRDD(shellcompare) val result =