[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
poseidon closed SPARK-20896. ---------------------------- Resolution: Fixed Fix Version/s: 1.6.4 Target Version/s: 1.6.2 No a issue > spark executor get java.lang.ClassCastException when trigger two job at same > time > --------------------------------------------------------------------------------- > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 1.6.1 > Reporter: poseidon > Fix For: 1.6.4 > > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {noformat} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {noformat} > --- > and code : > {noformat} > var df = sql("select b1,b2 from xxxx.xxxxx") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx") > sql("alter table xxxx.xxxxxxxx set lifecycle 1") > {noformat} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > OR > {quote} > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > some log from executor: > {quote} > 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID > 598) > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast > variable 30 > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored > as bytes in memory (estimated size 3.0 KB, free 909.7 KB) > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable > 30 took 80 ms > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as > values in memory (estimated size 5.4 KB, free 915.0 KB) > 17/05/26 16:39:44 INFO > parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: > ParquetInputSplit{part: > hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet > start: 0 end: 922 length: 922 hosts: []} > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast > variable 23 > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored > as bytes in memory (estimated size 25.0 KB, free 940.0 KB) > 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable > 23 took 6 ms > 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as > values in memory (estimated size 352.7 KB, free 1292.6 KB) > 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz] > 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage > 36.0 (TID 598) > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > {color:red} > these two exception never show in pairs, and will never show when you run two > code separately > {color} > {color:red} > If you delete the zipWithIndex part in any code ,you can not get the > exception. > {color} > Feels like when driver making DAG for zipWithIndex with two RDD at same time > , some where should be synchronized or locked. > So far as I know , if you just use spark-shell, or spark-submit or > spark-thrift , you can not recreate some situations like this case do. > So. Is this a wrong way to use spark-shell like this , multi-user with same > context , doing similar job? In other words , zeppelin SCOPE mode is not > going to be a steady mode , if we can not deal with exceptions like these -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org