[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211147#comment-15211147 ] Liyin Tang commented on SPARK-14105: https://github.com/apache/spark/pull/11921#issuecomment-201073631, I have verified the issue is caused by serializing MessageAndMetadata. Previously: {code} val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd {code} In this way, when serializing KafkaRDD, it needs to every object in MessageAndMetadata, which includes the slice of the ByteBuffer. This will cause the block more than 2 G. The simple workaround is {code} val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) {code} In this way, it has already deep-copy the message into a new byte[] during serializing. So the problem is avoid. Based on this finding, I don't think we need to move forward to deep-copy message in the FetchResponse. Maybe we can give user a logging or warning to avoid serializing MessageAndMetadata directly. Does it make sense ? If so, I can close this Jira as won't fix. The workaround solution is pretty straightforward. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210876#comment-15210876 ] Cody Koeninger commented on SPARK-14105: We definitely should make it not possible to persist a KafkaRDD for 0.10 consumers. In this case, I'm inclined towards going ahead and doing the deep copy, but overriding persist to log a strongly worded message that this is a bad idea. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210864#comment-15210864 ] Liyin Tang commented on SPARK-14105: Copied from PR discussion: === That example is just for demonstrating the bug. The actual code I run is more than count :) I need to convert the kafka message to a dataframe, and run spark sql on it. ``` val functions: (RDD[MessageAndMetadata[String, String]] => Unit) = (rdd) => { val startTS = System.currentTimeMillis() // filter out null message, otherwise it will cause json parsing throw exception val messageRDD = rdd.map(_.message()).filter(_ != null) val inputDF = sqlContext.read.schema(sourceStreamObject.schema).json(messageRDD) inputDF.registerTempTable(sourceStreamObject.name) inputDF.cache() processObjects.foreach(_.run(sqlContext)) sinkObjects.foreach(_.sink(sqlContext)) } ``` > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.e
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210858#comment-15210858 ] Sean Owen commented on SPARK-14105: --- I'm sure that's true, but should we then make it not possible to persist a KafkaRDD? here it fails in a poor way by trying to serialize a bunch of bytes. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at >
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210801#comment-15210801 ] Cody Koeninger commented on SPARK-14105: As I said in the PR, It's a lot more straightforward to just map before doing cache or window, which is probably what you want to be doing anyway so that you don't serialize a bunch of extra data. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kr
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210734#comment-15210734 ] Liyin Tang commented on SPARK-14105: Add the min code example to reproduce this issue. There is another way to work around this issue without changing KafkaRDD code, which register a customized Kryo serialization implementation for Kafka Message. In this way, we don't have to deep copy each message. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.wri
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209594#comment-15209594 ] Liyin Tang commented on SPARK-14105: I tested the PR with the same streaming job, and verified the serialization issue is fixed with the PR. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskSt
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209395#comment-15209395 ] Kevin Long commented on SPARK-14105: tested on 1.6.1 and the issue is also there. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFin
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209226#comment-15209226 ] Apache Spark commented on SPARK-14105: -- User 'liyintang' has created a pull request for this issue: https://github.com/apache/spark/pull/11921 > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Spark Core, Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.