Liyin Tang created SPARK-14105: ---------------------------------- Summary: Serialization issue for KafkaRDD Key: SPARK-14105 URL: https://issues.apache.org/jira/browse/SPARK-14105 Project: Spark Issue Type: Bug Reporter: Liyin Tang
When using DISK or Memory to persistent KafkaDirectInputStream, it will serialize the FetchResponse into blocks. The FetchResponse contains the ByteBufferMessageSet where each Kafka Message is just one slice of the underlying ByteBuffer. When serializing the KafkaRDDIterator, it seems like the entire underlying ByteBuffer in ByteBufferMessageSet will be serialized for each and every message. This will cause block size easily exceeds 2G, and lead to "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" The consumer fetch is the default value (1M). I tried to reduce fetch size, but it will cause other errors like errRanOutBeforeEnd. Here are exceptions I got for both Memory and Disk persistent. Memory Persistent: 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-9,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) Disk Persistent: 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org