[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
poseidon updated SPARK-20896: ----------------------------- Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {noformat} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {noformat} --- and code : {noformat} var df = sql("select b1,b2 from xxxx.xxxxx") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx") sql("alter table xxxx.xxxxxxxx set lifecycle 1") {noformat} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 3.0 KB, free 909.7 KB) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 30 took 80 ms 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 5.4 KB, free 915.0 KB) 17/05/26 16:39:44 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet start: 0 end: 922 length: 922 hosts: []} 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 23 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 25.0 KB, free 940.0 KB) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 23 took 6 ms 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 352.7 KB, free 1292.6 KB) 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz] 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 (TID 598) java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} these two exception never show in pairs, and will never show when you run two code separately If you delete the zipWithIndex part in any code ,you can not get the exception. Feels like when driver making DAG for zipWithIndex with two RDD at same time , some where should be synchronized or locked. So far as I know , if you just use spark-shell, or spark-submit or spark-thrift , you can not recreate some situations like this case do. So. Is this a wrong way to use spark-shell like this , multi-user with same context , doing similar job? In other words , zeppelin SCOPE mode is not going to be a steady mode , if we can not deal with exceptions like these was: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from xxxx.xxxxx") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx") sql("alter table xxxx.xxxxxxxx set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 3.0 KB, free 909.7 KB) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 30 took 80 ms 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 5.4 KB, free 915.0 KB) 17/05/26 16:39:44 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet start: 0 end: 922 length: 922 hosts: []} 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 23 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 25.0 KB, free 940.0 KB) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 23 took 6 ms 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 352.7 KB, free 1292.6 KB) 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz] 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 (TID 598) java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} these two exception never show in pairs, and will never show when you run two code separately If you delete the zipWithIndex part in any code ,you can not get the exception. Feels like when driver making DAG for zipWithIndex with two RDD at same time , some where should be synchronized or locked. So far as I know , if you just use spark-shell, or spark-submit or spark-thrift , you can not recreate some situations like this case do. So. Is this a wrong way to use spark-shell like this , multi-user with same context , doing similar job? In other words , zeppelin SCOPE mode is not going to be a steady mode , if we can not deal with exceptions like these > spark executor get java.lang.ClassCastException when trigger two job at same > time > --------------------------------------------------------------------------------- > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 1.6.1 > Reporter: poseidon > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {noformat} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {noformat} > --- > and code : > {noformat} > var df = sql("select b1,b2 from xxxx.xxxxx") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx") > sql("alter table xxxx.xxxxxxxx set lifecycle 1") > {noformat} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > OR > {quote} > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > some log from executor: > {quote} > 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID > 598) > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast > variable 30 > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored > as bytes in memory (estimated size 3.0 KB, free 909.7 KB) > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable > 30 took 80 ms > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as > values in memory (estimated size 5.4 KB, free 915.0 KB) > 17/05/26 16:39:44 INFO > parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: > ParquetInputSplit{part: > hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet > start: 0 end: 922 length: 922 hosts: []} > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast > variable 23 > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored > as bytes in memory (estimated size 25.0 KB, free 940.0 KB) > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable > 23 took 6 ms > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as > values in memory (estimated size 352.7 KB, free 1292.6 KB) > 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz] > 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage > 36.0 (TID 598) > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > these two exception never show in pairs, and will never show when you run two > code separately > If you delete the zipWithIndex part in any code ,you can not get the > exception. > Feels like when driver making DAG for zipWithIndex with two RDD at same time > , some where should be synchronized or locked. > So far as I know , if you just use spark-shell, or spark-submit or > spark-thrift , you can not recreate some situations like this case do. > So. Is this a wrong way to use spark-shell like this , multi-user with same > context , doing similar job? In other words , zeppelin SCOPE mode is not > going to be a steady mode , if we can not deal with exceptions like these -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org