回复:Re: Re: Re: Re: how to distributed run a bash shell in spark

2015-05-27 Thread luohui20001
Hi Akhil and all
My previous code has some problems,all the executors are looping and 
running the same command. That's not what I am expecting.previous code:
val shellcompare = List(run,sort.sh)
val shellcompareRDD = sc.makeRDD(shellcompare)
val result = List(aggregate,result)
val resultRDD = sc.makeRDD(result)
for(j - 1 to 21){
 resultRDD.pipe(sh /opt/sh/bin/sort.sh 
/opt/data/shellcompare/chr + j + .txt 
/opt/data/shellcompare/samplechr + j + .txt 
/opt/data/shellcompare/result + j + .txt 600).collect()
What i want is all  the executor running different commands concurrently. So I 
modified the code like below:val resultRDD = sc.parallelize(Array(1, 2, 3, 
4, 5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21))
resultRDD.map{i=resultRDD.pipe(sh /opt/sh/bin/sort.sh 
/opt/data/shellcompare/chr + i + .txt /opt/data/shellcompare/samplechr + i + 
.txt /opt/data/shellcompare/result + i + .txt 600)
}.collect()
however I got an exception below:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 
57, slave1): org.apache.spark.SparkException: RDD transformations and actions 
can only be invoked by the driver, not inside of other transformations; for 
example, rdd1.map(x = rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
any advices on this? Thanks.



 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月26日 14点46分

If you open up the driver UI (running on 4040), you can see multiple tasks per 
stage which will be happening concurrently. If it is a single task, and you 
want to increase the parallelism, then you can simply do a 
re-partition.ThanksBest Regards

On Tue, May 26, 2015 at 8:27 AM,  luohui20...@sina.com wrote:
I am right trying to run some shell script in my spark app, hoping it runs more 
concurrently in my spark cluster.However I am not sure whether my codes will 
run concurrently in my executors.Dive into my code, you can see that I am 
trying to 
1.splite both db and sample into 21 small files. That will generate total 42 
files. By spliting db I will get chr1 ,chr2,...chr21, and spliting sample 
I will get samplechr1,samplechr2,...samplechr21.2.merge those splited 
files from hdfs and save to local path.Those merged file will be chr1.txt 
,chr2.txt,...chr21.txt and 
samplechr1.txt,samplechr2.txt,...samplechr21.txt3.run modify.sh to clean 
the data, that means to delete some charactors not useful.
4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a 
result1.txt. And looping it from 1 to 21 so that those 42 file are compared and 
I can get 21 files like result1.txt,result2.txt...result21.txt.
Sorry for not adding some comments for my code.




 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月25日 22点41分

Can you can tell us what exactly you are trying to achieve?ThanksBest Regards

On Mon, May 25, 2015 at 5:00 PM,  luohui20...@sina.com wrote:
thanks, madhu and Akhil
I modified my code like below,however I think it is not so distributed. Have 
you guys better idea to run this app more efficiantly and distributed?
So I add some comments with my understanding:
import org.apache.spark._
import www.celloud.com.model._

object GeneCompare3 {
  def main(args: Array[String]) {
val conf = new 
SparkConf().setAppName(GeneCompare).setMaster(spark://master:7077).set(spark.executor.memory,
 6g).set(hive.metastore.warehouse.dir, /user/hive/warehouse)
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val db = sc.textFile(/data/db.txt).map(_.split(\t)).map(x = 
Database(x(0),
  x(1).trim().toInt,
  x(2).trim().toInt,
  x(3).trim().toInt,
  x(4).trim().toInt)).toDF()
val sample = sc.textFile(/data/sample.txt).map(_.split(\t)).map(x = 
Sample(x(0),
  x(1).trim().toInt,
  x(2),
  x(3).trim().toInt,
  x(4).trim().toInt,
  x(5))).toDF()
  
  //using Akhil Das's idea,1to21 is a file with 21lines,each line is a 
single number
//val test = sc.textfile(1to21.txt).foreach{i =  
 //running on driver manager
//  db.filter(chrname = 'chr + i + ').rdd.saveAsTextFile(/data/chr + 
i) //running on driver executor
//  db.rdd.pipe(hadoop fs -getmerge /data/chr + i +  
/opt/data/shellcompare/chr + i + .txt)
//  

回复:Re: Re: Re: Re: how to distributed run a bash shell in spark

2015-05-26 Thread luohui20001
Thanks Akhil, I checked the job UI again ,my app is running 
concurrently in all the executors. But some of the tasks got I/O exception. I 
will continue inspecting on this.
java.io.IOException: Failed to create local dir in 
/tmp/spark-b66c7c95-9242-4454-b900-9be9301c4c26/spark-e4c3e926-4bae-4893-85cb-635984d2ede7/spark-9bf34589-cea4-4736-a573-5347d983c134/blockmgr-0c82aca2-eaf1-43ae-8c55-cc4c1777e8f1/1b.
 Thank you.



 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月26日 14点46分

If you open up the driver UI (running on 4040), you can see multiple tasks per 
stage which will be happening concurrently. If it is a single task, and you 
want to increase the parallelism, then you can simply do a 
re-partition.ThanksBest Regards

On Tue, May 26, 2015 at 8:27 AM,  luohui20...@sina.com wrote:
I am right trying to run some shell script in my spark app, hoping it runs more 
concurrently in my spark cluster.However I am not sure whether my codes will 
run concurrently in my executors.Dive into my code, you can see that I am 
trying to 
1.splite both db and sample into 21 small files. That will generate total 42 
files. By spliting db I will get chr1 ,chr2,...chr21, and spliting sample 
I will get samplechr1,samplechr2,...samplechr21.2.merge those splited 
files from hdfs and save to local path.Those merged file will be chr1.txt 
,chr2.txt,...chr21.txt and 
samplechr1.txt,samplechr2.txt,...samplechr21.txt3.run modify.sh to clean 
the data, that means to delete some charactors not useful.
4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a 
result1.txt. And looping it from 1 to 21 so that those 42 file are compared and 
I can get 21 files like result1.txt,result2.txt...result21.txt.
Sorry for not adding some comments for my code.




 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月25日 22点41分

Can you can tell us what exactly you are trying to achieve?ThanksBest Regards

On Mon, May 25, 2015 at 5:00 PM,  luohui20...@sina.com wrote:
thanks, madhu and Akhil
I modified my code like below,however I think it is not so distributed. Have 
you guys better idea to run this app more efficiantly and distributed?
So I add some comments with my understanding:
import org.apache.spark._
import www.celloud.com.model._

object GeneCompare3 {
  def main(args: Array[String]) {
val conf = new 
SparkConf().setAppName(GeneCompare).setMaster(spark://master:7077).set(spark.executor.memory,
 6g).set(hive.metastore.warehouse.dir, /user/hive/warehouse)
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val db = sc.textFile(/data/db.txt).map(_.split(\t)).map(x = 
Database(x(0),
  x(1).trim().toInt,
  x(2).trim().toInt,
  x(3).trim().toInt,
  x(4).trim().toInt)).toDF()
val sample = sc.textFile(/data/sample.txt).map(_.split(\t)).map(x = 
Sample(x(0),
  x(1).trim().toInt,
  x(2),
  x(3).trim().toInt,
  x(4).trim().toInt,
  x(5))).toDF()
  
  //using Akhil Das's idea,1to21 is a file with 21lines,each line is a 
single number
//val test = sc.textfile(1to21.txt).foreach{i =  
 //running on driver manager
//  db.filter(chrname = 'chr + i + ').rdd.saveAsTextFile(/data/chr + 
i) //running on driver executor
//  db.rdd.pipe(hadoop fs -getmerge /data/chr + i +  
/opt/data/shellcompare/chr + i + .txt)
//  sample.filter(name = 'chr + i + 
').rdd.saveAsTextFile(/data/samplechr + i)
//  db.rdd.pipe(hadoop fs -getmerge /data/samplechr + i +  
/opt/data/shellcompare/samplechr + i + .txt)   //running on driver executor
//}.collect()  //running on driver manager

  //using madhu's method  //running on driver managerfor (i - 1 to 
21) {
  db.filter(chrname = 'chr + i + ').rdd.saveAsTextFile(/data/chr + i)
  db.rdd.pipe(hadoop fs -getmerge /data/chr + i +  
/opt/data/shellcompare/chr + i + .txt).collect()
  sample.filter(name = 'chr + i + 
').rdd.saveAsTextFile(/data/samplechr + i)
  db.rdd.pipe(hadoop fs -getmerge /data/samplechr + i +  
/opt/data/shellcompare/samplechr + i + .txt).collect()
}

//running on driver manager
val runmodifyshell=List(run,modify.sh)
val runmodifyshellRDD = sc.makeRDD(runmodifyshell)
val pipeModify = runmodifyshellRDD.pipe(sh 
/opt/data/shellcompare/modify.sh)
pipeModify.collect()

//running on driver manager
val shellcompare = List(run,sort.sh)
val shellcompareRDD = sc.makeRDD(shellcompare)
val result =