[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211147#comment-15211147
 ] 

Liyin Tang commented on SPARK-14105:


https://github.com/apache/spark/pull/11921#issuecomment-201073631,

I have verified the issue is caused by serializing MessageAndMetadata.

Previously:
{code}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
{code}
In this way, when serializing KafkaRDD, it needs to every object in 
MessageAndMetadata, which includes the slice of the ByteBuffer. This will cause 
the block more than 2 G.

The simple workaround is
{code}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), 
mmd.message())
{code}
In this way, it has already deep-copy the message into a new byte[] during 
serializing. So the problem is avoid.

Based on this finding, I don't think we need to move forward to deep-copy 
message in the FetchResponse. Maybe we can give user a logging or warning to 
avoid serializing MessageAndMetadata directly.

Does it make sense ? If so, I can close this Jira as won't fix. The workaround 
solution is pretty straightforward. 





> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at 

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210876#comment-15210876
 ] 

Cody Koeninger commented on SPARK-14105:


We definitely should make it not possible to persist a KafkaRDD for 0.10 
consumers.

In this case, I'm inclined towards going ahead and doing the deep copy, but 
overriding persist to log a strongly worded message that this is a bad idea.

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210864#comment-15210864
 ] 

Liyin Tang commented on SPARK-14105:


Copied from PR discussion:
===
That example is just for demonstrating the bug. The actual code I run is more 
than count :)
I need to convert the kafka message to a dataframe, and run spark sql on it.
```
   val functions: (RDD[MessageAndMetadata[String, String]] => Unit) 
= (rdd) => {
   val startTS = System.currentTimeMillis()
   // filter out null message, otherwise it will cause json parsing 
throw exception
   val messageRDD = rdd.map(_.message()).filter(_ != null)
   val inputDF = 
sqlContext.read.schema(sourceStreamObject.schema).json(messageRDD)
   inputDF.registerTempTable(sourceStreamObject.name)
   inputDF.cache()
   processObjects.foreach(_.run(sqlContext))
   sinkObjects.foreach(_.sink(sqlContext))
   }
```

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.e

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210858#comment-15210858
 ] 

Sean Owen commented on SPARK-14105:
---

I'm sure that's true, but should we then make it not possible to persist a 
KafkaRDD? here it fails in a poor way by trying to serialize a bunch of bytes.

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
>

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210801#comment-15210801
 ] 

Cody Koeninger commented on SPARK-14105:


As I said in the PR, It's a lot more straightforward to just map before doing 
cache or window, which is probably what you want to be doing anyway so that you 
don't serialize a bunch of extra data.

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kr

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210734#comment-15210734
 ] 

Liyin Tang commented on SPARK-14105:


Add the min code example to reproduce this issue.

There is another way to work around this issue without changing KafkaRDD code, 
which register a customized Kryo serialization implementation for Kafka 
Message. In this way, we don't have to deep copy each message. 

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.wri

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209594#comment-15209594
 ] 

Liyin Tang commented on SPARK-14105:


I tested the PR with the same streaming job, and verified the serialization 
issue is fixed with the PR.

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskSt

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Kevin Long (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209395#comment-15209395
 ] 

Kevin Long commented on SPARK-14105:


tested on 1.6.1 and the issue is also there.

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFin

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209226#comment-15209226
 ] 

Apache Spark commented on SPARK-14105:
--

User 'liyintang' has created a pull request for this issue:
https://github.com/apache/spark/pull/11921

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.