[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-10 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-
Description: 
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
  @Override
  public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId)) {
  streams.get(streamId).associatedChannel = channel;
}
  }
this is only chance associatedChannel  is set


public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry entry: streams.entrySet()) {
  StreamState state = entry.getValue();
  if (state.associatedChannel == channel) {
streams.remove(entry.getKey());

// Release all remaining buffers.
while (state.buffers.hasNext()) {
  state.buffers.next().release();
}
  }
}
this is only chance state.buffers is released.

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 
So, channel can not be set. 
So, we can see some leaked Buffer in OneForOneStreamManager

!screenshot-1.png!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever.
Which may lead to OOM in NodeManager.

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


We should set channel when  we registerStream, so buffer can be released. 

  was:
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
  @Override
  public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId)) {
  streams.get(streamId).associatedChannel = channel;
}
  }
this is only chance associatedChannel  is set


public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry entry: streams.entrySet()) {
  StreamState state = entry.getValue();
  if (state.associatedChannel == channel) {
streams.remove(entry.getKey());

// Release all remaining buffers.
while (state.buffers.hasNext()) {
  state.buffers.next().release();
}
  }
}
this is only chance state.buffers is released.

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 
So, channel can not be set. 
So, we can see some leaked Buffer in OneForOneStreamManager

!screenshot-1.png!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever.
Which may lead to OOM in NodeManager.

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


> OneForOneStreamManager may leak memory when network is poor
> ---
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258 
> spark 1.6 
>Reporter: poseidon
> Attachments: screenshot-1.png
>
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> 

[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-10 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-
Description: 
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
  @Override
  public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId)) {
  streams.get(streamId).associatedChannel = channel;
}
  }
this is only chance associatedChannel  is set


public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry entry: streams.entrySet()) {
  StreamState state = entry.getValue();
  if (state.associatedChannel == channel) {
streams.remove(entry.getKey());

// Release all remaining buffers.
while (state.buffers.hasNext()) {
  state.buffers.next().release();
}
  }
}
this is only chance state.buffers is released.

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 
So, channel can not be set. 
So, we can see some leaked Buffer in OneForOneStreamManager

!screenshot-1.png!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.

  was:
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager

!attachment-name.jpg|thumbnail!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


> OneForOneStreamManager may leak memory when network is poor
> ---
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258 
> spark 1.6 
>Reporter: poseidon
> Attachments: screenshot-1.png
>
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel 
> In 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 
> OpenBlocks  -> ChunkFetchRequest   come in sequnce. 
> spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
>   @Override
>   public void registerChannel(Channel channel, long streamId) {
> if (streams.containsKey(streamId)) {
>   streams.get(streamId).associatedChannel = channel;
> }
>   }
> this is only chance associatedChannel  is set
> public void connectionTerminated(Channel channel) {
> // Close all streams which have been associated with the channel.
> for (Map.Entry entry: streams.entrySet()) {
>   StreamState state = entry.getValue();
>   if (state.associatedChannel == channel) {
> 

[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-10 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-
Description: 
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
  @Override
  public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId)) {
  streams.get(streamId).associatedChannel = channel;
}
  }
this is only chance associatedChannel  is set


public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry entry: streams.entrySet()) {
  StreamState state = entry.getValue();
  if (state.associatedChannel == channel) {
streams.remove(entry.getKey());

// Release all remaining buffers.
while (state.buffers.hasNext()) {
  state.buffers.next().release();
}
  }
}
this is only chance state.buffers is released.

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 
So, channel can not be set. 
So, we can see some leaked Buffer in OneForOneStreamManager

!screenshot-1.png!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever.
Which may lead to OOM in NodeManager.

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.

  was:
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
  @Override
  public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId)) {
  streams.get(streamId).associatedChannel = channel;
}
  }
this is only chance associatedChannel  is set


public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry entry: streams.entrySet()) {
  StreamState state = entry.getValue();
  if (state.associatedChannel == channel) {
streams.remove(entry.getKey());

// Release all remaining buffers.
while (state.buffers.hasNext()) {
  state.buffers.next().release();
}
  }
}
this is only chance state.buffers is released.

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 
So, channel can not be set. 
So, we can see some leaked Buffer in OneForOneStreamManager

!screenshot-1.png!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


> OneForOneStreamManager may leak memory when network is poor
> ---
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258 
> spark 1.6 
>Reporter: poseidon
> Attachments: screenshot-1.png
>
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest 

[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-10 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-
Attachment: screenshot-1.png

> OneForOneStreamManager may leak memory when network is poor
> ---
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258 
> spark 1.6 
>Reporter: poseidon
> Attachments: screenshot-1.png
>
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel 
> In 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 
> OpenBlocks  -> ChunkFetchRequest   come in sequnce. 
> If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
> then. 
> So, we can see some leaked Buffer in OneForOneStreamManager
> !attachment-name.jpg|thumbnail!
> if 
> org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
>   is not set, then after search the code , it will remain in memory forever. 
> Because the only way to release it was in channel close , or someone read the 
> last piece of block. 
> OneForOneStreamManager#registerStream we can set channel in this method, just 
> in case of this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-10 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-
Description: 
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager

!attachment-name.jpg|thumbnail!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.

  was:
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager



if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


> OneForOneStreamManager may leak memory when network is poor
> ---
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258 
> spark 1.6 
>Reporter: poseidon
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel 
> In 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 
> OpenBlocks  -> ChunkFetchRequest   come in sequnce. 
> If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
> then. 
> So, we can see some leaked Buffer in OneForOneStreamManager
> !attachment-name.jpg|thumbnail!
> if 
> org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
>   is not set, then after search the code , it will remain in memory forever. 
> Because the only way to release it was in channel close , or someone read the 
> last piece of block. 
> OneForOneStreamManager#registerStream we can set channel in this method, just 
> in case of this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-10 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-
Description: 
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager



if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.

  was:
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager

!attachment-name.jpg|thumbnail!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


> OneForOneStreamManager may leak memory when network is poor
> ---
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258 
> spark 1.6 
>Reporter: poseidon
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel 
> In 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 
> OpenBlocks  -> ChunkFetchRequest   come in sequnce. 
> If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
> then. 
> So, we can see some leaked Buffer in OneForOneStreamManager
> if 
> org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
>   is not set, then after search the code , it will remain in memory forever. 
> Because the only way to release it was in channel close , or someone read the 
> last piece of block. 
> OneForOneStreamManager#registerStream we can set channel in this method, just 
> in case of this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

2017-09-08 Thread poseidon (JIRA)
poseidon created SPARK-21955:


 Summary: OneForOneStreamManager may leak memory when network is 
poor
 Key: SPARK-21955
 URL: https://issues.apache.org/jira/browse/SPARK-21955
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.6.1
 Environment: hdp 2.4.2.0-258 
spark 1.6 
Reporter: poseidon


just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager

!attachment-name.jpg|thumbnail!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-18 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon closed SPARK-20896.

  Resolution: Fixed
   Fix Version/s: 1.6.4
Target Version/s: 1.6.2

No a issue

> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
> Fix For: 1.6.4
>
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>   
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>   .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from .x")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
> .setOutputCol(inputCols(i)+"_binary")
> .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
> val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
> val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>   .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>   .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
> import org.apache.spark.sql.types.StructType
> val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
> saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
> sql("alter table . set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
> at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
> at 
> $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> 

[jira] [Commented] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-18 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16053386#comment-16053386
 ] 

poseidon commented on SPARK-20896:
--

It is related to Zeppelin,  so this issue can be closed .

> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>   
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>   .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from .x")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
> .setOutputCol(inputCols(i)+"_binary")
> .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
> val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
> val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>   .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>   .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
> import org.apache.spark.sql.types.StructType
> val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
> saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
> sql("alter table . set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
> at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
> at 
> $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> 

[jira] [Comment Edited] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-08 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16036462#comment-16036462
 ] 

poseidon edited comment on SPARK-20896 at 6/8/17 10:59 AM:
---

Sorry for the error code , JIRA must recognize some part of my code as macro. 

The code is totally ok , I will update it later. 
I guess  running two code interpret by sparkContext at the same time triggered 
this exception, because I won't get this exception when I run code separately. 
I say it's not a problem in spark-shell or spark-submit, because , spark-shell  
can only run just one piece of code at the same time , so , spark-shell won't 
get this exception no matter how many times you try. 

The code interacts with zeppelin in multi session:
zeppelin-notebook-> SparkILoop -> SparkIMain -> SparkContext 
(shared-by-notebook in SCOPE mode)

SCOPE mode make multi piece code run in spark-context at same time possible. 






was (Author: poseidon):
Sorry for the error code , JIRA must recognize some part of my code as macro. 

The code is totally ok , I will update it later. 
I guess  running two jobs at the same time triggered this exception, because I 
won't get this exception when I run code separately. 
I say it's not a problem in spark-shell or spark-submit, because , spark-shell  
can only run just one piece of code at the same time , so , spark-shell won't 
get this exception no matter how many times you try. 

The code interacts with zeppelin in multi session:
zeppelin-notebook-> SparkILoop -> SparkIMain -> SparkContext 
(shared-by-notebook in SCOPE mode)

SCOPE mode make multi piece code run in spark-context at same time possible. 





> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>   
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>   .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from .x")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
> .setOutputCol(inputCols(i)+"_binary")
> .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
> val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
> val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>   .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>   .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
> import org.apache.spark.sql.types.StructType
> val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
> saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
> sql("alter table . set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
> at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)

[jira] [Commented] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-08 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16042545#comment-16042545
 ] 

poseidon commented on SPARK-20896:
--

Sorry for confusing you with a concept I create and name as job. 
In zeppelin scope mode , two notebook , will run  in two thread, which share 
same sparkContext, make this case special.


> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>   
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>   .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from .x")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
> .setOutputCol(inputCols(i)+"_binary")
> .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
> val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
> val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>   .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>   .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
> import org.apache.spark.sql.types.StructType
> val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
> saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
> sql("alter table . set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
> at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
> at 
> $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at 
> 

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-04 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Description: 
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{noformat}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
  
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
  .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{noformat}
---
and code :
{noformat}
var df = sql("select b1,b2 from .x")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
.setOutputCol(inputCols(i)+"_binary")
.setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
  .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
  .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

import org.apache.spark.sql.types.StructType
val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
sql("alter table . set lifecycle 1")
{noformat}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-04 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Description: 
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{noformat}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
  
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
  .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{noformat}
---
and code :
{noformat}
var df = sql("select b1,b2 from .x")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
.setOutputCol(inputCols(i)+"_binary")
.setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
  .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
  .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

import org.apache.spark.sql.types.StructType
val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
sql("alter table . set lifecycle 1")
{noformat}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 

[jira] [Commented] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-06-04 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16036462#comment-16036462
 ] 

poseidon commented on SPARK-20896:
--

Sorry for the error code , JIRA must recognize some part of my code as macro. 

The code is totally ok , I will update it later. 
I guess  running two jobs at the same time triggered this exception, because I 
won't get this exception when I run code separately. 
I say it's not a problem in spark-shell or spark-submit, because , spark-shell  
can only run just one piece of code at the same time , so , spark-shell won't 
get this exception no matter how many times you try. 

The code interacts with zeppelin in multi session:
zeppelin-notebook-> SparkILoop -> SparkIMain -> SparkContext 
(shared-by-notebook in SCOPE mode)

SCOPE mode make multi piece code run in spark-context at same time possible. 





> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {quote}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>   
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>   .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {quote}
> ---
> and code :
> {quote}
> var df = sql("select b1,b2 from .x")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
> .setOutputCol(inputCols(i)+"_binary")
> .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
> val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
> val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>   .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>   .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
> import org.apache.spark.sql.types.StructType
> val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
> saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
> sql("alter table . set lifecycle 1")
> {quote}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
> at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Description: 
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
  
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
  .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from .x")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
.setOutputCol(inputCols(i)+"_binary")
.setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
  .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
  .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

import org.apache.spark.sql.types.StructType
val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
sql("alter table . set lifecycle 1")
{quote}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as 

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Description: 
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
  
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
  .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from .x")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
.setOutputCol(inputCols(i)+"_binary")
.setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
  .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
  .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

import org.apache.spark.sql.types.StructType
val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
sql("alter table . set lifecycle 1")
{quote}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as 

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Description: 
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
  
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
  .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from .x")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
.setOutputCol(inputCols(i)+"_binary")
.setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
  .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
  .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

import org.apache.spark.sql.types.StructType
val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
sql("alter table . set lifecycle 1")
{quote}

on zeppelin with two different notebook at same time. 

Found this exeption log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: 
org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
at 
$line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.spark.mllib.linalg.DenseVector
at 
$line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{quote}

these two execption nerver show in pairs. 




  was:
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => 

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Description: 
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
  
.join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
  .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
:+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from .x")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
.setOutputCol(inputCols(i)+"_binary")
.setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
  .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
  .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ 
row2.toSeq)}

import org.apache.spark.sql.types.StructType
val rowSchema = StructType(saveDFBin.schema.fields ++ 
dfAppendBin.schema.fields)
saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
sql("alter table . set lifecycle 1")
{quote}

at the same time 

  was:
1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like 



> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2 
> 2、spark 1.6.2 
> 3、hdp 2.4 for HDFS YARN 
> trigger scala code like :
> {quote}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.x")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>   
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>   .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {quote}
> ---
> and code :
> {quote}
> var df = sql("select b1,b2 from .x")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
> .setOutputCol(inputCols(i)+"_binary")
> .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
> val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
> val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>   .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>   .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
> import org.apache.spark.sql.types.StructType
> val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
> saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable(".")
> sql("alter table . set lifecycle 1")
> {quote}
> at the same time 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: 

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Attachment: (was: token_err.log)

> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2 
> 2、spark 1.6.2 
> 3、hdp 2.4 for HDFS YARN 
> trigger scala code like 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-20896:
-
Attachment: token_err.log

> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> -
>
> Key: SPARK-20896
> URL: https://issues.apache.org/jira/browse/SPARK-20896
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.6.1
>Reporter: poseidon
>
> 1、zeppelin 0.6.2 
> 2、spark 1.6.2 
> 3、hdp 2.4 for HDFS YARN 
> trigger scala code like 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

2017-05-26 Thread poseidon (JIRA)
poseidon created SPARK-20896:


 Summary: spark executor get java.lang.ClassCastException when 
trigger two job at same time
 Key: SPARK-20896
 URL: https://issues.apache.org/jira/browse/SPARK-20896
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 1.6.1
Reporter: poseidon


1、zeppelin 0.6.2 
2、spark 1.6.2 
3、hdp 2.4 for HDFS YARN 

trigger scala code like 




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore

2017-03-02 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893565#comment-15893565
 ] 

poseidon commented on SPARK-14698:
--

[~azeroth2b] 
I think in spark 1.6.1, author do it on purpose. If this bug fixed, function 
can store in DB, but can't not loaded again on thrift-server restart.

But i can upload the patch anyway.

spark-1.6.1\sql\hive\src\main\scala\org\apache\spark\sql\hive\HiveContext.scala
private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
".*(create|drop)\\s+(temporary\\s+)(function|macro).+", 
Pattern.DOTALL).matcher(command)

this is the correct regular-expression to lead create function command stored 
in DB

> CREATE FUNCTION cloud not add function to hive metastore
> 
>
> Key: SPARK-14698
> URL: https://issues.apache.org/jira/browse/SPARK-14698
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: spark1.6.1
>Reporter: poseidon
>  Labels: easyfix
>
> build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as 
> metastore server. 
> Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL 
> UDF. 
> find out , can not add this FUNCTION to mysql metastore,but the function 
> usage goes well.
> if you try to add it again , thrift server throw a alread Exist Exception.
> [SPARK-10151][SQL] Support invocation of hive macro 
> add a if condition when runSqlHive, which will exec create function in 
> hiveexec. caused this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15224) Can not delete jar and list jar in spark Thrift server

2016-05-10 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279386#comment-15279386
 ] 

poseidon commented on SPARK-15224:
--

well,It's very obvious, the exception say that it's not a valid syntax. But, in 
origin hive sql, it's valid, and works well. 
After we add jar to thrift server , every sql will depend on this jar , and 
every executor will add this dependency when executor start. 
if we can not delete jar, and know how many jars we have load. Thrif-sever will 
be a very fat server after running for a while. 


> Can not delete jar and list jar in spark Thrift server
> --
>
> Key: SPARK-15224
> URL: https://issues.apache.org/jira/browse/SPARK-15224
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: spark 1.6.1
> hive 1.2.1 
> hdfs 2.7.1 
>Reporter: poseidon
>Priority: Minor
>
> when you try to delete jar , and exec delete jar  or list jar in you 
> beeline client. it throws exception
> delete jar; 
> Error: org.apache.spark.sql.AnalysisException: line 1:7 missing FROM at 
> 'jars' near 'jars'
> line 1:12 missing EOF at 'myudfs' near 'jars'; (state=,code=0)
> list jar;
> Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 
> 'list' 'jars' ''; line 1 pos 0 (state=,code=0)
> {code:title=funnlog.log|borderStyle=solid}
> 16/05/09 17:26:52 INFO thriftserver.SparkExecuteStatementOperation: Running 
> query 'list jar' with 1da09765-efb4-42dc-8890-3defca40f89d
> 16/05/09 17:26:52 INFO parse.ParseDriver: Parsing command: list jar
> NoViableAltException(26@[])
>   at 
> org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1071)
>   at 
> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
>   at 
> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
>   at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
>   at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
>   at 
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
>   at 
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
>   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>   at 
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>   at 
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
>   at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
>   at 
> org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
>   at 
> org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
>   at 
> org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:293)
>   at 
> org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:240)
>   at 
> org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:239)
>   at 
> org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:282)
>   at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
>   at 
> 

[jira] [Updated] (SPARK-15224) Can not delete jar and list jar in spark Thrift server

2016-05-09 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-15224:
-
Description: 
when you try to delete jar , and exec delete jar  or list jar in you 
beeline client. it throws exception
delete jar; 
Error: org.apache.spark.sql.AnalysisException: line 1:7 missing FROM at 'jars' 
near 'jars'
line 1:12 missing EOF at 'myudfs' near 'jars'; (state=,code=0)
list jar;
Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 
'list' 'jars' ''; line 1 pos 0 (state=,code=0)

{code:title=funnlog.log|borderStyle=solid}
16/05/09 17:26:52 INFO thriftserver.SparkExecuteStatementOperation: Running 
query 'list jar' with 1da09765-efb4-42dc-8890-3defca40f89d
16/05/09 17:26:52 INFO parse.ParseDriver: Parsing command: list jar
NoViableAltException(26@[])
at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1071)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:293)
at 
org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:240)
at 
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:239)
at 
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:282)
at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
at 
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at 
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at 
org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at 
org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 

[jira] [Created] (SPARK-15224) Can not delete jar and list jar in spark Thrift server

2016-05-09 Thread poseidon (JIRA)
poseidon created SPARK-15224:


 Summary: Can not delete jar and list jar in spark Thrift server
 Key: SPARK-15224
 URL: https://issues.apache.org/jira/browse/SPARK-15224
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
 Environment: spark 1.6.1
hive 1.2.1 
hdfs 2.7.1 
Reporter: poseidon


when you try to delete jar , and exec delete jar  or list jar in you 
beeline client. it throws exception
delete jar; 
Error: org.apache.spark.sql.AnalysisException: line 1:7 missing FROM at 'jars' 
near 'jars'
line 1:12 missing EOF at 'myudfs' near 'jars'; (state=,code=0)
list jar;
Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 
'list' 'jars' ''; line 1 pos 0 (state=,code=0)






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore

2016-05-08 Thread poseidon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275854#comment-15275854
 ] 

poseidon commented on SPARK-14698:
--

spark 1.6.1
hive 1.2.1 
mysql 5.6 

just start thrift-server , and create udf as normal in hive . 

you can replicate this issue.

I've fixed this issue already. 
add to udf to metastore is just the fist step , you have to fix lookup udf in 
metastore when parse sql as well .

> CREATE FUNCTION cloud not add function to hive metastore
> 
>
> Key: SPARK-14698
> URL: https://issues.apache.org/jira/browse/SPARK-14698
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: spark1.6.1
>Reporter: poseidon
>  Labels: easyfix
>
> build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as 
> metastore server. 
> Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL 
> UDF. 
> find out , can not add this FUNCTION to mysql metastore,but the function 
> usage goes well.
> if you try to add it again , thrift server throw a alread Exist Exception.
> [SPARK-10151][SQL] Support invocation of hive macro 
> add a if condition when runSqlHive, which will exec create function in 
> hiveexec. caused this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore

2016-04-18 Thread poseidon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-14698:
-
Description: 
build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as 
metastore server. 
Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL 
UDF. 
find out , can not add this FUNCTION to mysql metastore,but the function usage 
goes well.
if you try to add it again , thrift server throw a alread Exist Exception.
[SPARK-10151][SQL] Support invocation of hive macro 
add a if condition when runSqlHive, which will exec create function in 
hiveexec. caused this problem.


  was:
build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as 
metastore server. 
start a thrift server , then in beeline , try to create a function. 
found out , can not add this create to metastore,but the function goes well.
if you try to add it again , thrift server throw a alread Exist Exception.
[SPARK-10151][SQL] Support invocation of hive macro 
add a if condition when runSqlHive, which will exec create function in 
hiveexec. caused this problem.


> CREATE FUNCTION cloud not add function to hive metastore
> 
>
> Key: SPARK-14698
> URL: https://issues.apache.org/jira/browse/SPARK-14698
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: spark1.6.1
>Reporter: poseidon
>  Labels: easyfix
>
> build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as 
> metastore server. 
> Start a thrift server , then in beeline , try to CREATE FUNCTION as HIVE SQL 
> UDF. 
> find out , can not add this FUNCTION to mysql metastore,but the function 
> usage goes well.
> if you try to add it again , thrift server throw a alread Exist Exception.
> [SPARK-10151][SQL] Support invocation of hive macro 
> add a if condition when runSqlHive, which will exec create function in 
> hiveexec. caused this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14698) CREATE FUNCTION cloud not add function to hive metastore

2016-04-18 Thread poseidon (JIRA)
poseidon created SPARK-14698:


 Summary: CREATE FUNCTION cloud not add function to hive metastore
 Key: SPARK-14698
 URL: https://issues.apache.org/jira/browse/SPARK-14698
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
 Environment: spark1.6.1
Reporter: poseidon


build spark 1.6.1 , and run it with 1.2.1 hive version,config mysql as 
metastore server. 
start a thrift server , then in beeline , try to create a function. 
found out , can not add this create to metastore,but the function goes well.
if you try to add it again , thrift server throw a alread Exist Exception.
[SPARK-10151][SQL] Support invocation of hive macro 
add a if condition when runSqlHive, which will exec create function in 
hiveexec. caused this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org