[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
poseidon updated SPARK-21955: ----------------------------- Description: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry<Long, StreamState> entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Which may lead to OOM in NodeManager. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. We should set channel when we registerStream, so buffer can be released. was: just in my way to know how stream , chunk , block works in netty found some nasty case. process OpenBlocks message registerStream Stream in OneForOneStreamManager org.apache.spark.network.server.OneForOneStreamManager#registerStream fill with streamState with app & buber process ChunkFetchRequest registerChannel org.apache.spark.network.server.OneForOneStreamManager#registerChannel fill with streamState with channel In org.apache.spark.network.shuffle.OneForOneBlockFetcher#start OpenBlocks -> ChunkFetchRequest come in sequnce. spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java @Override public void registerChannel(Channel channel, long streamId) { if (streams.containsKey(streamId)) { streams.get(streamId).associatedChannel = channel; } } this is only chance associatedChannel is set public void connectionTerminated(Channel channel) { // Close all streams which have been associated with the channel. for (Map.Entry<Long, StreamState> entry: streams.entrySet()) { StreamState state = entry.getValue(); if (state.associatedChannel == channel) { streams.remove(entry.getKey()); // Release all remaining buffers. while (state.buffers.hasNext()) { state.buffers.next().release(); } } } this is only chance state.buffers is released. If network down in OpenBlocks process, no more ChunkFetchRequest message then. So, channel can not be set. So, we can see some leaked Buffer in OneForOneStreamManager !screenshot-1.png! if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever. Which may lead to OOM in NodeManager. Because the only way to release it was in channel close , or someone read the last piece of block. OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case. > OneForOneStreamManager may leak memory when network is poor > ----------------------------------------------------------- > > Key: SPARK-21955 > URL: https://issues.apache.org/jira/browse/SPARK-21955 > Project: Spark > Issue Type: Bug > Components: Block Manager > Affects Versions: 1.6.1 > Environment: hdp 2.4.2.0-258 > spark 1.6 > Reporter: poseidon > Attachments: screenshot-1.png > > > just in my way to know how stream , chunk , block works in netty found some > nasty case. > process OpenBlocks message registerStream Stream in OneForOneStreamManager > org.apache.spark.network.server.OneForOneStreamManager#registerStream > fill with streamState with app & buber > process ChunkFetchRequest registerChannel > org.apache.spark.network.server.OneForOneStreamManager#registerChannel > fill with streamState with channel > In > org.apache.spark.network.shuffle.OneForOneBlockFetcher#start > OpenBlocks -> ChunkFetchRequest come in sequnce. > spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java > @Override > public void registerChannel(Channel channel, long streamId) { > if (streams.containsKey(streamId)) { > streams.get(streamId).associatedChannel = channel; > } > } > this is only chance associatedChannel is set > public void connectionTerminated(Channel channel) { > // Close all streams which have been associated with the channel. > for (Map.Entry<Long, StreamState> entry: streams.entrySet()) { > StreamState state = entry.getValue(); > if (state.associatedChannel == channel) { > streams.remove(entry.getKey()); > // Release all remaining buffers. > while (state.buffers.hasNext()) { > state.buffers.next().release(); > } > } > } > this is only chance state.buffers is released. > If network down in OpenBlocks process, no more ChunkFetchRequest message > then. > So, channel can not be set. > So, we can see some leaked Buffer in OneForOneStreamManager > !screenshot-1.png! > if > org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel > is not set, then after search the code , it will remain in memory forever. > Which may lead to OOM in NodeManager. > Because the only way to release it was in channel close , or someone read the > last piece of block. > OneForOneStreamManager#registerStream we can set channel in this method, just > in case of this case. > We should set channel when we registerStream, so buffer can be released. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org