[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {noformat} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {noformat} --- and code : {noformat} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {noformat} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 sto
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {noformat} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {noformat} --- and code : {noformat} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {noformat} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 sto
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as byte
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as byte
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exeption log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} these two execption nerver show in pairs. was: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row =>
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} at the same time was: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like : > {quote} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {quote} > --- > and code : > {quote} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {quote} > at the same time -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Attachment: (was: token_err.log) > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Attachment: token_err.log > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org