[ 
https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

poseidon updated SPARK-21955:
-----------------------------
    Description: 
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
  @Override
  public void registerChannel(Channel channel, long streamId) {
    if (streams.containsKey(streamId)) {
      streams.get(streamId).associatedChannel = channel;
    }
  }
this is only chance associatedChannel  is set


public void connectionTerminated(Channel channel) {
    // Close all streams which have been associated with the channel.
    for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
      StreamState state = entry.getValue();
      if (state.associatedChannel == channel) {
        streams.remove(entry.getKey());

        // Release all remaining buffers.
        while (state.buffers.hasNext()) {
          state.buffers.next().release();
        }
      }
    }
this is only chance state.buffers is released.

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 
So, channel can not be set. 
So, we can see some leaked Buffer in OneForOneStreamManager

!screenshot-1.png!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.

  was:
just in my way to know how  stream , chunk , block works in netty found some 
nasty case.

process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber 

process  ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel 

In 
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 

OpenBlocks  -> ChunkFetchRequest   come in sequnce. 

If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
then. 

So, we can see some leaked Buffer in OneForOneStreamManager

!attachment-name.jpg|thumbnail!

if 
org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
  is not set, then after search the code , it will remain in memory forever. 

Because the only way to release it was in channel close , or someone read the 
last piece of block. 


OneForOneStreamManager#registerStream we can set channel in this method, just 
in case of this case.


> OneForOneStreamManager may leak memory when network is poor
> -----------------------------------------------------------
>
>                 Key: SPARK-21955
>                 URL: https://issues.apache.org/jira/browse/SPARK-21955
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 1.6.1
>         Environment: hdp 2.4.2.0-258 
> spark 1.6 
>            Reporter: poseidon
>         Attachments: screenshot-1.png
>
>
> just in my way to know how  stream , chunk , block works in netty found some 
> nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel 
> In 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 
> OpenBlocks  -> ChunkFetchRequest   come in sequnce. 
> spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
>   @Override
>   public void registerChannel(Channel channel, long streamId) {
>     if (streams.containsKey(streamId)) {
>       streams.get(streamId).associatedChannel = channel;
>     }
>   }
> this is only chance associatedChannel  is set
> public void connectionTerminated(Channel channel) {
>     // Close all streams which have been associated with the channel.
>     for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
>       StreamState state = entry.getValue();
>       if (state.associatedChannel == channel) {
>         streams.remove(entry.getKey());
>         // Release all remaining buffers.
>         while (state.buffers.hasNext()) {
>           state.buffers.next().release();
>         }
>       }
>     }
> this is only chance state.buffers is released.
> If network down in OpenBlocks  process, no more ChunkFetchRequest  message 
> then. 
> So, channel can not be set. 
> So, we can see some leaked Buffer in OneForOneStreamManager
> !screenshot-1.png!
> if 
> org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel
>   is not set, then after search the code , it will remain in memory forever. 
> Because the only way to release it was in channel close , or someone read the 
> last piece of block. 
> OneForOneStreamManager#registerStream we can set channel in this method, just 
> in case of this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to