[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
-------------------------------
    Description: 
When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd.


Here is the min code to reproduce this issue. This example just tries to 
demonstrate the bug, not the actual code we run.
```
        // create source stream object
        val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))

        // Create topic, kafkaParams, messageHandler and offsetnRanges
        val topicsSet: Set[String] = "flog".split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> 
KafkaCluster.MAIN.getKafkaConnString)
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
        val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
            KafkaCluster.MAIN,
            topicsSet.toList.asJava,
            
kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)

        // Create an DStream
        val inputStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
            ssc,
            kafkaParams,
            topicPartitionOffsetRange,
            messageHandler)

        // Apply window function
        inputStream.window(Seconds(slidingWindowInterval), 
Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())

        ssc.start()
        ssc.awaitTermination()
 ```

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
    at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
    at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
    at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
    at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
    at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
    at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
    at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

Disk Persistent: 
16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) on 
executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
(java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
    at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
    at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
  

  was:
When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd.


Here is the min code to reproduce this issue:
```
        // create source stream object
        val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))

        // Create topic, kafkaParams, messageHandler and offsetnRanges
        val topicsSet: Set[String] = "flog".split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> 
KafkaCluster.MAIN.getKafkaConnString)
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
        val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
            KafkaCluster.MAIN,
            topicsSet.toList.asJava,
            
kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)

        // Create an DStream
        val inputStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
            ssc,
            kafkaParams,
            topicPartitionOffsetRange,
            messageHandler)

        // Apply window function
        inputStream.window(Seconds(slidingWindowInterval), 
Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())

        ssc.start()
        ssc.awaitTermination()
 ```

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
    at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
    at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
    at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
    at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
    at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
    at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
    at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

Disk Persistent: 
16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) on 
executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
(java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
    at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
    at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
  


> Serialization issue for KafkaRDD
> --------------------------------
>
>                 Key: SPARK-14105
>                 URL: https://issues.apache.org/jira/browse/SPARK-14105
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2, 1.6.1
>            Reporter: Liyin Tang
>         Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> ```
>         // create source stream object
>         val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
>         // Create topic, kafkaParams, messageHandler and offsetnRanges
>         val topicsSet: Set[String] = "flog".split(",").toSet
>         val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
>         val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
>         val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
>             KafkaCluster.MAIN,
>             topicsSet.toList.asJava,
>             
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
>         // Create an DStream
>         val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
>             ssc,
>             kafkaParams,
>             topicPartitionOffsetRange,
>             messageHandler)
>         // Apply window function
>         inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
>         ssc.start()
>         ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>     at java.util.Arrays.copyOf(Arrays.java:3236)
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>     at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>     at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
>     at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
>     at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
>     at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
>     at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>     at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
>     at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
>     at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
>     at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
>     at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
>     at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
>     at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
>     at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>     at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>     at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
>     at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to