[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210734#comment-15210734
 ] 

Liyin Tang edited comment on SPARK-14105 at 3/24/16 6:35 PM:
-------------------------------------------------------------

Add the min code example to reproduce this issue in the jira description.

There is another way to work around this issue without changing KafkaRDD code, 
which register a customized Kryo serialization implementation for Kafka 
Message. In this way, we don't have to deep copy each message. 


was (Author: liyin):
Add the min code example to reproduce this issue.

There is another way to work around this issue without changing KafkaRDD code, 
which register a customized Kryo serialization implementation for Kafka 
Message. In this way, we don't have to deep copy each message. 

> Serialization issue for KafkaRDD
> --------------------------------
>
>                 Key: SPARK-14105
>                 URL: https://issues.apache.org/jira/browse/SPARK-14105
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2, 1.6.1
>            Reporter: Liyin Tang
>         Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
>         // create source stream object
>         val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
>         // Create topic, kafkaParams, messageHandler and offsetnRanges
>         val topicsSet: Set[String] = "flog".split(",").toSet
>         val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
>         val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
>         val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
>             KafkaCluster.MAIN,
>             topicsSet.toList.asJava,
>             
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
>         // Create an DStream
>         val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
>             ssc,
>             kafkaParams,
>             topicPartitionOffsetRange,
>             messageHandler)
>         // Apply window function
>         inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
>         ssc.start()
>         ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>     at java.util.Arrays.copyOf(Arrays.java:3236)
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>     at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>     at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
>     at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
>     at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
>     at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
>     at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>     at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
>     at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
>     at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
>     at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
>     at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
>     at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
>     at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
>     at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>     at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>     at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
>     at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to