[ 
https://issues.apache.org/jira/browse/SPARK-18756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udit Mehrotra updated SPARK-18756:
----------------------------------
    Description: 
We have a Spark streaming application, that processes data from Kinesis.

In our application we are observing a memory leak at the Executors with Netty 
buffers not being released properly, when the Spark BlockManager tries to 
replicate the input blocks received from Kinesis stream. The leak occurs, when 
we set Storage Level as MEMORY_AND_DISK_2 for the Kinesis input blocks. 
However, if we change the Storage level to use MEMORY_AND_DISK, which avoids 
creating a replica, we do not observe the leak any more. We were able to detect 
the leak, and obtain the stack trace by running the executors with an 
additional JVM option: -Dio.netty.leakDetectionLevel=advanced.

Here is the stack trace of the leak:

16/12/06 22:30:12 ERROR ResourceLeakDetector: LEAK: ByteBuf.release() was not 
called before it's garbage-collected. See 
http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 0
Created at:
        io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:103)
        io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:335)
        io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:247)
        
org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:69)
        
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1182)
        
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:997)
        
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
        org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
        
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
        
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:702)
        
org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:80)
        
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
        
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
        org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
        
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:282)
        
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:352)
        
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
        
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
        
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)

We also observe a continuous increase in off heap memory usage at the 
executors. Any help would be appreciated.

> Memory leak in Spark streaming
> ------------------------------
>
>                 Key: SPARK-18756
>                 URL: https://issues.apache.org/jira/browse/SPARK-18756
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, DStreams
>    Affects Versions: 2.0.0, 2.0.1, 2.0.2
>            Reporter: Udit Mehrotra
>
> We have a Spark streaming application, that processes data from Kinesis.
> In our application we are observing a memory leak at the Executors with Netty 
> buffers not being released properly, when the Spark BlockManager tries to 
> replicate the input blocks received from Kinesis stream. The leak occurs, 
> when we set Storage Level as MEMORY_AND_DISK_2 for the Kinesis input blocks. 
> However, if we change the Storage level to use MEMORY_AND_DISK, which avoids 
> creating a replica, we do not observe the leak any more. We were able to 
> detect the leak, and obtain the stack trace by running the executors with an 
> additional JVM option: -Dio.netty.leakDetectionLevel=advanced.
> Here is the stack trace of the leak:
> 16/12/06 22:30:12 ERROR ResourceLeakDetector: LEAK: ByteBuf.release() was not 
> called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 0
> Created at:
>       io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:103)
>       io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:335)
>       io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:247)
>       
> org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:69)
>       
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1182)
>       
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:997)
>       
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
>       org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>       
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>       
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:702)
>       
> org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:80)
>       
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
>       
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
>       org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
>       
> org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:282)
>       
> org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:352)
>       
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
>       
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
>       
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
> We also observe a continuous increase in off heap memory usage at the 
> executors. Any help would be appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to