[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-----------------------------
    Description: 
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{quote}

on zeppelin with two different notebook at same time. 

Found this exeption log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
        at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
        at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
        at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}

these two execption nerver show in pairs. 




  was:
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{quote}

at the same time 


> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-20896
>                 URL: https://issues.apache.org/jira/browse/SPARK-20896
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: poseidon
>
> 1、zeppelin 0.6.2 
> 2、spark 1.6.2 
> 3、hdp 2.4 for HDFS YARN 
> trigger scala code like :
> {quote}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>               
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>               .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {quote}
> ---
> and code :
> {quote}
> var df = sql("select b1,b2 from xxxx.xxxxx")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
>     .setOutputCol(inputCols(i)+"_binary")
>     .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
>         val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
>         val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>           .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>           .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
>         import org.apache.spark.sql.types.StructType
>         val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
>         saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
> sql("alter table xxxx.xxxxxxxx set lifecycle 1")
> {quote}
> on zeppelin with two different notebook at same time. 
> Found this exeption log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
>         at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
>         at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
>         at 
> $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> these two execption nerver show in pairs. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to