[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-----------------------------
    Description: 
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{quote}

at the same time 

  was:
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like 



> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-20896
>                 URL: https://issues.apache.org/jira/browse/SPARK-20896
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: poseidon
>
> 1、zeppelin 0.6.2 
> 2、spark 1.6.2 
> 3、hdp 2.4 for HDFS YARN 
> trigger scala code like :
> {quote}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>               
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>               .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {quote}
> ---
> and code :
> {quote}
> var df = sql("select b1,b2 from xxxx.xxxxx")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
>     .setOutputCol(inputCols(i)+"_binary")
>     .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
>         val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
>         val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>           .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>           .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
>         import org.apache.spark.sql.types.StructType
>         val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
>         saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
> sql("alter table xxxx.xxxxxxxx set lifecycle 1")
> {quote}
> at the same time 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to