[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-14105: ------------------------------------ Assignee: Apache Spark > Serialization issue for KafkaRDD > -------------------------------- > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Spark Core, Streaming > Affects Versions: 1.5.2 > Reporter: Liyin Tang > Assignee: Apache Spark > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) > at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) > -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org