[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-21955: - Description: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Which may lead to OOM in NodeManager. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. We should set channel when we registerStream, so buffer can be released. was: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Which may lead to OOM in NodeManager. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. > OneForOneStreamManager may leak memory when network is poor > --- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 >Reporter: poseidon > Attachments: screenshot-1.png > > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill
[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-21955: - Description: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. was: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, we can see some leaked Buffer in OneForOneStreamManager !attachment-name.jpg|thumbnail! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. > OneForOneStreamManager may leak memory when network is poor > --- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 >Reporter: poseidon > Attachments: screenshot-1.png > > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill with streamState with app & buber > process ChunkFetchRequest registerChannel > org.apache.spark.network.server.OneForOneStreamManager#registerChannel > fill with streamState with channel > In > org.apache.spark.network.shuffle.OneForOneBlockFetcher#start > OpenBlocks -> ChunkFetchRequest come in sequnce. > spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java > @Override > public void registerChannel(Channel channel, long streamId) { > if (streams.containsKey(streamId)) { > streams.get(streamId).associatedChannel = channel; > } > } > this is only chance associatedChannel is set > public void connectionTerminated(Channel channel) { > // Close all streams which have been associated with the channel. > for (Map.Entry entry: streams.entrySet()) { > StreamState state = entry.getValue(); > if (state.associatedChannel == channel) { > streams.remove(entry.getKey()); > // Release al
[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-21955: - Description: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Which may lead to OOM in NodeManager. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. was: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. > OneForOneStreamManager may leak memory when network is poor > --- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 >Reporter: poseidon > Attachments: screenshot-1.png > > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill with streamState with app & buber > process ChunkFetchRequest registerChannel > org.apache.spark.network.server.
[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-21955: - Attachment: screenshot-1.png > OneForOneStreamManager may leak memory when network is poor > --- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 >Reporter: poseidon > Attachments: screenshot-1.png > > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill with streamState with app & buber > process ChunkFetchRequest registerChannel > org.apache.spark.network.server.OneForOneStreamManager#registerChannel > fill with streamState with channel > In > org.apache.spark.network.shuffle.OneForOneBlockFetcher#start > OpenBlocks -> ChunkFetchRequest come in sequnce. > If network down in OpenBlocks process, no more ChunkFetchRequest message > then. > So, we can see some leaked Buffer in OneForOneStreamManager > !attachment-name.jpg|thumbnail! > if > org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel > is not set, then after search the code , it will remain in memory forever. > Because the only way to release it was in channel close , or someone read the > last piece of block. > OneForOneStreamManager#registerStream we can set channel in this method, just > in case of this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-21955: - Description: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, we can see some leaked Buffer in OneForOneStreamManager !attachment-name.jpg|thumbnail! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. was: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, we can see some leaked Buffer in OneForOneStreamManager if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. > OneForOneStreamManager may leak memory when network is poor > --- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 >Reporter: poseidon > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill with streamState with app & buber > process ChunkFetchRequest registerChannel > org.apache.spark.network.server.OneForOneStreamManager#registerChannel > fill with streamState with channel > In > org.apache.spark.network.shuffle.OneForOneBlockFetcher#start > OpenBlocks -> ChunkFetchRequest come in sequnce. > If network down in OpenBlocks process, no more ChunkFetchRequest message > then. > So, we can see some leaked Buffer in OneForOneStreamManager > !attachment-name.jpg|thumbnail! > if > org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel > is not set, then after search the code , it will remain in memory forever. > Because the only way to release it was in channel close , or someone read the > last piece of block. > OneForOneStreamManager#registerStream we can set channel in this method, just > in case of this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-21955: - Description: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, we can see some leaked Buffer in OneForOneStreamManager if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. was: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, we can see some leaked Buffer in OneForOneStreamManager !attachment-name.jpg|thumbnail! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. > OneForOneStreamManager may leak memory when network is poor > --- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 >Reporter: poseidon > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill with streamState with app & buber > process ChunkFetchRequest registerChannel > org.apache.spark.network.server.OneForOneStreamManager#registerChannel > fill with streamState with channel > In > org.apache.spark.network.shuffle.OneForOneBlockFetcher#start > OpenBlocks -> ChunkFetchRequest come in sequnce. > If network down in OpenBlocks process, no more ChunkFetchRequest message > then. > So, we can see some leaked Buffer in OneForOneStreamManager > if > org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel > is not set, then after search the code , it will remain in memory forever. > Because the only way to release it was in channel close , or someone read the > last piece of block. > OneForOneStreamManager#registerStream we can set channel in this method, just > in case of this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor
poseidon created SPARK-21955: Summary: OneForOneStreamManager may leak memory when network is poor Key: SPARK-21955 URL: https://issues.apache.org/jira/browse/SPARK-21955 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.6.1 Environment: hdp 2.4.2.0-258 spark 1.6 Reporter: poseidon just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, we can see some leaked Buffer in OneForOneStreamManager !attachment-name.jpg|thumbnail! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon closed SPARK-20896. Resolution: Fixed Fix Version/s: 1.6.4 Target Version/s: 1.6.2 No a issue > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > Fix For: 1.6.4 > > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {noformat} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {noformat} > --- > and code : > {noformat} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {noformat} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > OR > {quote} > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler
[jira] [Commented] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16053386#comment-16053386 ] poseidon commented on SPARK-20896: -- It is related to Zeppelin, so this issue can be closed . > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {noformat} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {noformat} > --- > and code : > {noformat} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {noformat} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > OR > {quote} > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleM
[jira] [Comment Edited] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16036462#comment-16036462 ] poseidon edited comment on SPARK-20896 at 6/8/17 10:59 AM: --- Sorry for the error code , JIRA must recognize some part of my code as macro. The code is totally ok , I will update it later. I guess running two code interpret by sparkContext at the same time triggered this exception, because I won't get this exception when I run code separately. I say it's not a problem in spark-shell or spark-submit, because , spark-shell can only run just one piece of code at the same time , so , spark-shell won't get this exception no matter how many times you try. The code interacts with zeppelin in multi session: zeppelin-notebook-> SparkILoop -> SparkIMain -> SparkContext (shared-by-notebook in SCOPE mode) SCOPE mode make multi piece code run in spark-context at same time possible. was (Author: poseidon): Sorry for the error code , JIRA must recognize some part of my code as macro. The code is totally ok , I will update it later. I guess running two jobs at the same time triggered this exception, because I won't get this exception when I run code separately. I say it's not a problem in spark-shell or spark-submit, because , spark-shell can only run just one piece of code at the same time , so , spark-shell won't get this exception no matter how many times you try. The code interacts with zeppelin in multi session: zeppelin-notebook-> SparkILoop -> SparkIMain -> SparkContext (shared-by-notebook in SCOPE mode) SCOPE mode make multi piece code run in spark-context at same time possible. > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {noformat} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {noformat} > --- > and code : > {noformat} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {noformat} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Ut
[jira] [Commented] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042545#comment-16042545 ] poseidon commented on SPARK-20896: -- Sorry for confusing you with a concept I create and name as job. In zeppelin scope mode , two notebook , will run in two thread, which share same sparkContext, make this case special. > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {noformat} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {noformat} > --- > and code : > {noformat} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {noformat} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > OR > {quote} > java.lang.ClassCastException: scala.Tuple2 cannot be cast to > org.apache.spark.mllib.linalg.DenseVector > at > $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {noformat} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {noformat} --- and code : {noformat} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {noformat} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 sto
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {noformat} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {noformat} --- and code : {noformat} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {noformat} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 sto
[jira] [Commented] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16036462#comment-16036462 ] poseidon commented on SPARK-20896: -- Sorry for the error code , JIRA must recognize some part of my code as macro. The code is totally ok , I will update it later. I guess running two jobs at the same time triggered this exception, because I won't get this exception when I run code separately. I say it's not a problem in spark-shell or spark-submit, because , spark-shell can only run just one piece of code at the same time , so , spark-shell won't get this exception no matter how many times you try. The code interacts with zeppelin in multi session: zeppelin-notebook-> SparkILoop -> SparkIMain -> SparkContext (shared-by-notebook in SCOPE mode) SCOPE mode make multi piece code run in spark-context at same time possible. > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 in *SCOPE* mode > 2、spark 1.6.2 > 3、HDP 2.4 for HDFS YARN > trigger scala code like : > {quote} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {quote} > --- > and code : > {quote} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {quote} > on zeppelin with two different notebook at same time. > Found this exception log in executor : > {quote} > l1.dtdream.com): java.lang.ClassCastException: > org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 > at > $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.j
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as byte
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 in *SCOPE* mode 2、spark 1.6.2 3、HDP 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exception log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} some log from executor: {quote} 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598) 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as byte
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} on zeppelin with two different notebook at same time. Found this exeption log in executor : {quote} l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2 at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} OR {quote} java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {quote} these two execption nerver show in pairs. was: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row =>
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Description: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like : {quote} var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") val vectorDf = assembler.transform(tmpDataFrame) val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) val rows = columns.toSeq.transpose val vectors = rows.map(row => new DenseVector(row.toArray)) val vRdd = sc.parallelize(vectors) import sqlContext.implicits._ val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() val rows = dfV.rdd.zipWithIndex.map(_.swap) .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)} {quote} --- and code : {quote} var df = sql("select b1,b2 from .x") var i = 0 var threshold = Array(2.0,3.0) var inputCols = Array("b1","b2") var tmpDataFrame = df for (col <- inputCols){ val binarizer: Binarizer = new Binarizer().setInputCol(col) .setOutputCol(inputCols(i)+"_binary") .setThreshold(threshold(i)) tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) i = i+1 } var saveDFBin = tmpDataFrame val dfAppendBin = sql("select b3 from poseidon.corelatdemo") val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)} import org.apache.spark.sql.types.StructType val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields) saveDFBin = sqlContext.createDataFrame(rows, rowSchema) //save result to table import org.apache.spark.sql.SaveMode saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") sql("alter table . set lifecycle 1") {quote} at the same time was: 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like : > {quote} > var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x") > val vectorDf = assembler.transform(tmpDataFrame) > val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)} > val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman") > val columns = correlMatrix.toArray.grouped(correlMatrix.numRows) > val rows = columns.toSeq.transpose > val vectors = rows.map(row => new DenseVector(row.toArray)) > val vRdd = sc.parallelize(vectors) > import sqlContext.implicits._ > val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF() > val rows = dfV.rdd.zipWithIndex.map(_.swap) > > .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap)) > .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq > :+ x)} > {quote} > --- > and code : > {quote} > var df = sql("select b1,b2 from .x") > var i = 0 > var threshold = Array(2.0,3.0) > var inputCols = Array("b1","b2") > var tmpDataFrame = df > for (col <- inputCols){ > val binarizer: Binarizer = new Binarizer().setInputCol(col) > .setOutputCol(inputCols(i)+"_binary") > .setThreshold(threshold(i)) > tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i)) > i = i+1 > } > var saveDFBin = tmpDataFrame > val dfAppendBin = sql("select b3 from poseidon.corelatdemo") > val rows = saveDFBin.rdd.zipWithIndex.map(_.swap) > .join(dfAppendBin.rdd.zipWithIndex.map(_.swap)) > .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq > ++ row2.toSeq)} > import org.apache.spark.sql.types.StructType > val rowSchema = StructType(saveDFBin.schema.fields ++ > dfAppendBin.schema.fields) > saveDFBin = sqlContext.createDataFrame(rows, rowSchema) > //save result to table > import org.apache.spark.sql.SaveMode > saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".") > sql("alter table . set lifecycle 1") > {quote} > at the same time -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Attachment: (was: token_err.log) > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
[ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-20896: - Attachment: token_err.log > spark executor get java.lang.ClassCastException when trigger two job at same > time > - > > Key: SPARK-20896 > URL: https://issues.apache.org/jira/browse/SPARK-20896 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: poseidon > > 1、zeppelin 0.6.2 > 2、spark 1.6.2 > 3、hdp 2.4 for HDFS YARN > trigger scala code like -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time
poseidon created SPARK-20896: Summary: spark executor get java.lang.ClassCastException when trigger two job at same time Key: SPARK-20896 URL: https://issues.apache.org/jira/browse/SPARK-20896 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.6.1 Reporter: poseidon 1、zeppelin 0.6.2 2、spark 1.6.2 3、hdp 2.4 for HDFS YARN trigger scala code like -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore
[ https://issues.apache.org/jira/browse/SPARK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893565#comment-15893565 ] poseidon commented on SPARK-14698: -- [~azeroth2b] I think in spark 1.6.1, author do it on purpose. If this bug fixed, function can store in DB, but can't not loaded again on thrift-server restart. But i can upload the patch anyway. spark-1.6.1\sql\hive\src\main\scala\org\apache\spark\sql\hive\HiveContext.scala private def functionOrMacroDDLPattern(command: String) = Pattern.compile( ".*(create|drop)\\s+(temporary\\s+)(function|macro).+", Pattern.DOTALL).matcher(command) this is the correct regular-expression to lead create function command stored in DB > CREATE FUNCTION cloud not add function to hive metastore > > > Key: SPARK-14698 > URL: https://issues.apache.org/jira/browse/SPARK-14698 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 > Environment: spark1.6.1 >Reporter: poseidon > Labels: easyfix > > build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as > metastore server. > Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL > UDF. > find out , can not add this FUNCTION to mysql metastore,but the function > usage goes well. > if you try to add it again , thrift server throw a alread Exist Exception. > [SPARK-10151][SQL] Support invocation of hive macro > add a if condition when runSqlHive, which will exec create function in > hiveexec. caused this problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15224) Can not delete jar and list jar in spark Thrift server
[ https://issues.apache.org/jira/browse/SPARK-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15279386#comment-15279386 ] poseidon commented on SPARK-15224: -- well,It's very obvious, the exception say that it's not a valid syntax. But, in origin hive sql, it's valid, and works well. After we add jar to thrift server , every sql will depend on this jar , and every executor will add this dependency when executor start. if we can not delete jar, and know how many jars we have load. Thrif-sever will be a very fat server after running for a while. > Can not delete jar and list jar in spark Thrift server > -- > > Key: SPARK-15224 > URL: https://issues.apache.org/jira/browse/SPARK-15224 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 > Environment: spark 1.6.1 > hive 1.2.1 > hdfs 2.7.1 >Reporter: poseidon >Priority: Minor > > when you try to delete jar , and exec delete jar or list jar in you > beeline client. it throws exception > delete jar; > Error: org.apache.spark.sql.AnalysisException: line 1:7 missing FROM at > 'jars' near 'jars' > line 1:12 missing EOF at 'myudfs' near 'jars'; (state=,code=0) > list jar; > Error: org.apache.spark.sql.AnalysisException: cannot recognize input near > 'list' 'jars' ''; line 1 pos 0 (state=,code=0) > {code:title=funnlog.log|borderStyle=solid} > 16/05/09 17:26:52 INFO thriftserver.SparkExecuteStatementOperation: Running > query 'list jar' with 1da09765-efb4-42dc-8890-3defca40f89d > 16/05/09 17:26:52 INFO parse.ParseDriver: Parsing command: list jar > NoViableAltException(26@[]) > at > org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1071) > at > org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202) > at > org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166) > at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276) > at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) > at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) > at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) > at > scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) > at > scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) > at > org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) > at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295) > at > org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66) > at > org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66) > at > org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:293) > at > org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:240) > at > org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:239) > at > org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:282) > at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65) > at > org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) > at > org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQL
[jira] [Updated] (SPARK-15224) Can not delete jar and list jar in spark Thrift server
[ https://issues.apache.org/jira/browse/SPARK-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-15224: - Description: when you try to delete jar , and exec delete jar or list jar in you beeline client. it throws exception delete jar; Error: org.apache.spark.sql.AnalysisException: line 1:7 missing FROM at 'jars' near 'jars' line 1:12 missing EOF at 'myudfs' near 'jars'; (state=,code=0) list jar; Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 'list' 'jars' ''; line 1 pos 0 (state=,code=0) {code:title=funnlog.log|borderStyle=solid} 16/05/09 17:26:52 INFO thriftserver.SparkExecuteStatementOperation: Running query 'list jar' with 1da09765-efb4-42dc-8890-3defca40f89d 16/05/09 17:26:52 INFO parse.ParseDriver: Parsing command: list jar NoViableAltException(26@[]) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1071) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166) at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295) at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66) at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:293) at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:240) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:239) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:282) at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114) at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(
[jira] [Created] (SPARK-15224) Can not delete jar and list jar in spark Thrift server
poseidon created SPARK-15224: Summary: Can not delete jar and list jar in spark Thrift server Key: SPARK-15224 URL: https://issues.apache.org/jira/browse/SPARK-15224 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1 Environment: spark 1.6.1 hive 1.2.1 hdfs 2.7.1 Reporter: poseidon when you try to delete jar , and exec delete jar or list jar in you beeline client. it throws exception delete jar; Error: org.apache.spark.sql.AnalysisException: line 1:7 missing FROM at 'jars' near 'jars' line 1:12 missing EOF at 'myudfs' near 'jars'; (state=,code=0) list jar; Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 'list' 'jars' ''; line 1 pos 0 (state=,code=0) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore
[ https://issues.apache.org/jira/browse/SPARK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15275854#comment-15275854 ] poseidon commented on SPARK-14698: -- spark 1.6.1 hive 1.2.1 mysql 5.6 just start thrift-server , and create udf as normal in hive . you can replicate this issue. I've fixed this issue already. add to udf to metastore is just the fist step , you have to fix lookup udf in metastore when parse sql as well . > CREATE FUNCTION cloud not add function to hive metastore > > > Key: SPARK-14698 > URL: https://issues.apache.org/jira/browse/SPARK-14698 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 > Environment: spark1.6.1 >Reporter: poseidon > Labels: easyfix > > build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as > metastore server. > Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL > UDF. > find out , can not add this FUNCTION to mysql metastore,but the function > usage goes well. > if you try to add it again , thrift server throw a alread Exist Exception. > [SPARK-10151][SQL] Support invocation of hive macro > add a if condition when runSqlHive, which will exec create function in > hiveexec. caused this problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore
[ https://issues.apache.org/jira/browse/SPARK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] poseidon updated SPARK-14698: - Description: build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as metastore server. Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL UDF. find out , can not add this FUNCTION to mysql metastore,but the function usage goes well. if you try to add it again , thrift server throw a alread Exist Exception. [SPARK-10151][SQL] Support invocation of hive macro add a if condition when runSqlHive, which will exec create function in hiveexec. caused this problem. was: build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as metastore server. start a thrift server , then in beeline , try to create a function. found out , can not add this create to metastore,but the function goes well. if you try to add it again , thrift server throw a alread Exist Exception. [SPARK-10151][SQL] Support invocation of hive macro add a if condition when runSqlHive, which will exec create function in hiveexec. caused this problem. > CREATE FUNCTION cloud not add function to hive metastore > > > Key: SPARK-14698 > URL: https://issues.apache.org/jira/browse/SPARK-14698 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 > Environment: spark1.6.1 >Reporter: poseidon > Labels: easyfix > > build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as > metastore server. > Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL > UDF. > find out , can not add this FUNCTION to mysql metastore,but the function > usage goes well. > if you try to add it again , thrift server throw a alread Exist Exception. > [SPARK-10151][SQL] Support invocation of hive macro > add a if condition when runSqlHive, which will exec create function in > hiveexec. caused this problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore
poseidon created SPARK-14698: Summary: CREATE FUNCTION cloud not add function to hive metastore Key: SPARK-14698 URL: https://issues.apache.org/jira/browse/SPARK-14698 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1 Environment: spark1.6.1 Reporter: poseidon build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as metastore server. start a thrift server , then in beeline , try to create a function. found out , can not add this create to metastore,but the function goes well. if you try to add it again , thrift server throw a alread Exist Exception. [SPARK-10151][SQL] Support invocation of hive macro add a if condition when runSqlHive, which will exec create function in hiveexec. caused this problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org