[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-----------------------------
    Description: 
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{noformat}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{noformat}
---
and code :
{noformat}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{noformat}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
        at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
        at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
        at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as 
bytes in memory (estimated size 3.0 KB, free 909.7 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
30 took 80 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values 
in memory (estimated size 5.4 KB, free 915.0 KB)
17/05/26 16:39:44 INFO 
parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: 
ParquetInputSplit{part: 
hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet
 start: 0 end: 922 length: 922 hosts: []}
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 23
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as 
bytes in memory (estimated size 25.0 KB, free 940.0 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
23 took 6 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values 
in memory (estimated size 352.7 KB, free 1292.6 KB)
17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 
(TID 598)
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
        at 
$line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}

{color:red}
these two exception never show in pairs, and will never show when you run two 
code separately
{color}

{color:red}
If you delete the zipWithIndex part in any code ,you can not get the exception. 
{color}

Feels like when driver making DAG for zipWithIndex with two RDD at same time , 
some where should be synchronized or locked. 

So far as I know , if you just use spark-shell, or spark-submit or spark-thrift 
, you can not  recreate some situations like this case do. 

So. Is this a wrong way to use spark-shell like this , multi-user with same 
context , doing similar job? In other words ,  zeppelin SCOPE mode is not going 
to be a steady mode , if we can not deal with exceptions like these 


  was:
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{noformat}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{noformat}
---
and code :
{noformat}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{noformat}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
        at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
        at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
        at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as 
bytes in memory (estimated size 3.0 KB, free 909.7 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
30 took 80 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values 
in memory (estimated size 5.4 KB, free 915.0 KB)
17/05/26 16:39:44 INFO 
parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: 
ParquetInputSplit{part: 
hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet
 start: 0 end: 922 length: 922 hosts: []}
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 23
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as 
bytes in memory (estimated size 25.0 KB, free 940.0 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
23 took 6 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values 
in memory (estimated size 352.7 KB, free 1292.6 KB)
17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 
(TID 598)
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
        at 
$line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}


these two exception never show in pairs, and will never show when you run two 
code separately

If you delete the zipWithIndex part in any code ,you can not get the exception. 

Feels like when driver making DAG for zipWithIndex with two RDD at same time , 
some where should be synchronized or locked. 

So far as I know , if you just use spark-shell, or spark-submit or spark-thrift 
, you can not  recreate some situations like this case do. 

So. Is this a wrong way to use spark-shell like this , multi-user with same 
context , doing similar job? In other words ,  zeppelin SCOPE mode is not going 
to be a steady mode , if we can not deal with exceptions like these 



> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-20896
>                 URL: https://issues.apache.org/jira/browse/SPARK-20896
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>               
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>               .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from xxxx.xxxxx")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
>     .setOutputCol(inputCols(i)+"_binary")
>     .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
>         val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
>         val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>           .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>           .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
>         import org.apache.spark.sql.types.StructType
>         val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
>         saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
> sql("alter table xxxx.xxxxxxxx set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
>         at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
>         at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
>         at 
> $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> some log from executor:
> {quote}
> 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
> 598)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
> variable 30
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored 
> as bytes in memory (estimated size 3.0 KB, free 909.7 KB)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
> 30 took 80 ms
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as 
> values in memory (estimated size 5.4 KB, free 915.0 KB)
> 17/05/26 16:39:44 INFO 
> parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: 
> ParquetInputSplit{part: 
> hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet
>  start: 0 end: 922 length: 922 hosts: []}
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
> variable 23
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored 
> as bytes in memory (estimated size 25.0 KB, free 940.0 KB)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
> 23 took 6 ms
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as 
> values in memory (estimated size 352.7 KB, free 1292.6 KB)
> 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
> 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 
> 36.0 (TID 598)
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
>       at 
> $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {quote}
> {color:red}
> these two exception never show in pairs, and will never show when you run two 
> code separately
> {color}
> {color:red}
> If you delete the zipWithIndex part in any code ,you can not get the 
> exception. 
> {color}
> Feels like when driver making DAG for zipWithIndex with two RDD at same time 
> , some where should be synchronized or locked. 
> So far as I know , if you just use spark-shell, or spark-submit or 
> spark-thrift , you can not  recreate some situations like this case do. 
> So. Is this a wrong way to use spark-shell like this , multi-user with same 
> context , doing similar job? In other words ,  zeppelin SCOPE mode is not 
> going to be a steady mode , if we can not deal with exceptions like these 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to