[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
poseidon updated SPARK-20896: ----------------------------- Description: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from xxxx.xxxxx") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx") sql("alter table xxxx.xxxxxxxx set lifecycle 1") {quote} at the same time was: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like > spark executor get java.lang.ClassCastException when trigger two job at same > time > --------------------------------------------------------------------------------- > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 1.6.1 > Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like : > {quote} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {quote} > --- > and code : > {quote} > var df = sql("select b1,b2 from xxxx.xxxxx") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx") > sql("alter table xxxx.xxxxxxxx set lifecycle 1") > {quote} > at the same time -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org