[jira] [Updated] (SPARK-16244) Failed job/stage couldn't stop JobGenerator immediately.

2016-06-27 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-16244:
---
Description: 
This streaming job has a very simple DAG. Each batch have only 1 job, and each 
job has only 1 stage.

Based on the following logs, we observed a potential race condition. Stage 1 
failed due to some tasks failure, and it tigers failJobAndIndependentStages.

In the meanwhile, the next stage (job), 2, is submitted and was able to 
successfully run a few tasks before stopping JobGenerator via shutdown hook.

Since the next job was able to run through a few tasks successfully, it just 
messed up all the checkpoints / offset management.

Here is the log from my job:


{color:red}
Stage 227 started:
{color}
[INFO] 2016-06-25 18:59:00,171 org.apache.spark.scheduler.DAGScheduler logInfo 
- Submitting 1495 missing tasks from ResultStage 227 (MapPartitionsRDD[455] at 
foreachRDD at DBExportStreaming.java:55)
[INFO] 2016-06-25 18:59:00,160 org.apache.spark.scheduler.DAGScheduler logInfo 
- Final stage: ResultStage 227(foreachRDD at DBExportStreaming.java:55)
[INFO] 2016-06-25 18:59:00,160 org.apache.spark.scheduler.DAGScheduler logInfo 
- Submitting ResultStage 227 (MapPartitionsRDD[455] at foreachRDD at 
DBExportStreaming.java:55), which has no missing parents
[INFO] 2016-06-25 18:59:00,171 org.apache.spark.scheduler.DAGScheduler logInfo 
- Submitting 1495 missing tasks from ResultStage 227 (MapPartitionsRDD[455] at 
foreachRDD at DBExportStreaming.java:55)

{color:red}
Stage 227 failed:
{color}
[ERROR] 2016-06-25 19:01:34,083 org.apache.spark.scheduler.TaskSetManager 
logError - Task 26 in stage 227.0 failed 4 times; aborting job
[INFO] 2016-06-25 19:01:34,086 org.apache.spark.scheduler.cluster.YarnScheduler 
logInfo - Cancelling stage 227
[INFO] 2016-06-25 19:01:34,088 org.apache.spark.scheduler.cluster.YarnScheduler 
logInfo - Stage 227 was cancelled
[INFO] 2016-06-25 19:01:34,089 org.apache.spark.scheduler.DAGScheduler logInfo 
- ResultStage 227 (foreachRDD at DBExportStreaming.java:55) failed in 153.914 s
[INFO] 2016-06-25 19:01:34,090 org.apache.spark.scheduler.DAGScheduler logInfo 
- Job 227 failed: foreachRDD at DBExportStreaming.java:55, took 153.930462 s
[INFO] 2016-06-25 19:01:34,091 
org.apache.spark.streaming.scheduler.JobScheduler logInfo - Finished job 
streaming job 146688114 ms.0 from job set of time 14
6688114 ms
[INFO] 2016-06-25 19:01:34,091 
org.apache.spark.streaming.scheduler.JobScheduler logInfo - Total delay: 
154.091 s for time 146688114 ms (execution: 153.935
s)

{color:red}
Stage 228 started:
{color}

[INFO] 2016-06-25 19:01:34,094 org.apache.spark.SparkContext logInfo - Starting 
job: foreachRDD at DBExportStreaming.java:55
[INFO] 2016-06-25 19:01:34,095 org.apache.spark.scheduler.DAGScheduler logInfo 
- Got job 228 (foreachRDD at DBExportStreaming.java:55) with 1495 output 
partitions
[INFO] 2016-06-25 19:01:34,095 org.apache.spark.scheduler.DAGScheduler logInfo 
- Final stage: ResultStage 228(foreachRDD at DBExportStreaming.java:55)
Exception in thread "main" [INFO] 2016-06-25 19:01:34,095 
org.apache.spark.scheduler.DAGScheduler logInfo - Parents of final stage: List()

{color:red}
Shutdown hook was called after stage 228 started:
{color}

[INFO] 2016-06-25 19:01:34,099 org.apache.spark.streaming.StreamingContext 
logInfo - Invoking stop(stopGracefully=false) from shutdown hook
[INFO] 2016-06-25 19:01:34,101 
org.apache.spark.streaming.scheduler.JobGenerator logInfo - Stopping 
JobGenerator immediately
[INFO] 2016-06-25 19:01:34,102 org.apache.spark.streaming.util.RecurringTimer 
logInfo - Stopped timer for JobGenerator after time 146688126
[INFO] 2016-06-25 19:01:34,103 
org.apache.spark.streaming.scheduler.JobGenerator logInfo - Stopped JobGenerator
[INFO] 2016-06-25 19:01:34,106 org.apache.spark.storage.MemoryStore logInfo - 
ensureFreeSpace(133720) called with curMem=344903, maxMem=1159641169
[INFO] 2016-06-25 19:01:34,106 org.apache.spark.storage.MemoryStore logInfo - 
Block broadcast_229 stored as values in memory (estimated size 130.6 KB, free 
1105.5 MB)
[INFO] 2016-06-25 19:01:34,107 org.apache.spark.storage.MemoryStore logInfo - 
ensureFreeSpace(51478) called with curMem=478623, maxMem=1159641169
[INFO] 2016-06-25 19:01:34,107 org.apache.spark.storage.MemoryStore logInfo - 
Block broadcast_229_piece0 stored as bytes in memory (estimated size 50.3 KB, 
free 1105.4 MB)
[INFO] 2016-06-25 19:01:34,108 org.apache.spark.storage.BlockManagerInfo 
logInfo - Added broadcast_229_piece0 in memory on 10.123.209.8:42154 (size: 
50.3 KB, free: 1105.8 MB)
[INFO] 2016-06-25 19:01:34,109 org.apache.spark.SparkContext logInfo - Created 
broadcast 229 from broadcast at DAGScheduler.scala:861
[INFO] 2016-06-25 19:01:34,110 org.apache.spark.scheduler.DAGScheduler logInfo 
- Submitting 1495 missing tasks from ResultStage 228 (MapPartitionsRDD[458] at 
foreachRDD at 

[jira] [Created] (SPARK-16244) Failed job/stage couldn't stop JobGenerator immediately.

2016-06-27 Thread Liyin Tang (JIRA)
Liyin Tang created SPARK-16244:
--

 Summary: Failed job/stage couldn't stop JobGenerator immediately.
 Key: SPARK-16244
 URL: https://issues.apache.org/jira/browse/SPARK-16244
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.2
Reporter: Liyin Tang


This streaming job has a very simple DAG. Each batch have only 1 job, and each 
job has only 1 stage.

Based on the following logs, we observed a potential race condition. Stage 1 
failed due to some tasks failure, and it tigers failJobAndIndependentStages.

In the meanwhile, the next stage (job), 2, is submitted and was able to 
successfully run a few tasks before stopping JobGenerator via shutdown hook.

Since the next job was able to run through a few tasks successfully, it just 
messed up all the checkpoints / offset management.

I will attach the log in the jira as well.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14230) Config the start time (jitter) for streaming jobs

2016-03-30 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218672#comment-15218672
 ] 

Liyin Tang commented on SPARK-14230:


[~davies], thanks for the response. If I understand it correctly, this PR you 
pointed is to set a start time for window function. What about non-window 
function, do we also have a way to specify the start time ?



> Config the start time (jitter) for streaming jobs
> -
>
> Key: SPARK-14230
> URL: https://issues.apache.org/jira/browse/SPARK-14230
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Liyin Tang
>
> Currently, RecurringTimer will normalize the start time. For instance, if 
> batch duration is 1 min, all the job will start exactly at 1 min boundary. 
> This actually adds some burden to the streaming source. Assuming the source 
> is Kafka, and there is a list of streaming jobs with 1 min batch duration, 
> then at first few seconds of each min, high network traffic will be observed 
> in Kafka. This makes Kafka capacity planning tricky. 
> It will be great to have an option in the streaming context to set the job 
> start time. In this way, user can add a jitter for the start time for each, 
> and make Kafka fetch_request much smooth across the duration window.
> {code}
> class RecurringTimer {
>   def getStartTime(): Long = {
> (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + 
> jitter
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14230) Config the start time (jitter) for streaming jobs

2016-03-28 Thread Liyin Tang (JIRA)
Liyin Tang created SPARK-14230:
--

 Summary: Config the start time (jitter) for streaming jobs
 Key: SPARK-14230
 URL: https://issues.apache.org/jira/browse/SPARK-14230
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Liyin Tang


Currently, RecurringTimer will normalize the start time. For instance, if batch 
duration is 1 min, all the job will start exactly at 1 min boundary. 

This actually adds some burden to the streaming source. Assuming the source is 
Kafka, and there is a list of streaming jobs with 1 min batch duration, then at 
first few seconds of each min, high network traffic will be observed in Kafka. 
This makes Kafka capacity planning tricky. 

It will be great to have an option in the streaming context to set the job 
start time. In this way, user can add a jitter for the start time for each, and 
make Kafka fetch_request much smooth across the duration window.

{code}
class RecurringTimer {
  def getStartTime(): Long = {
(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + 
jitter
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD

2016-03-25 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang closed SPARK-14105.
--
Resolution: Won't Fix

Avoid serializing MessageAndMetadata.

Easy workaround:
{code}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), 
mmd.message())
{code}

> Serialization issue for MessageAndMetadata in KafkaRDD
> --
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> {code}
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
> {code}
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> {code}
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> 

[jira] [Issue Comment Deleted] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD

2016-03-25 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Comment: was deleted

(was: Avoid serializing MessageAndMetadata.

Easy workaround:
{code}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), 
mmd.message())
{code})

> Serialization issue for MessageAndMetadata in KafkaRDD
> --
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> {code}
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
> {code}
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> {code}
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> 

[jira] [Commented] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD

2016-03-25 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211999#comment-15211999
 ] 

Liyin Tang commented on SPARK-14105:


Thanks [~c...@koeninger.org] and [~sowen] for the review and comments ! It has 
been great start experience to contribute to the community.

For this jira,It is not ideal to do the deep copy from the fetch response 
directly. And users can easily avoid serializing MessageAndMetadata directly. 

I think it makes sense to close it as won't-fix

> Serialization issue for MessageAndMetadata in KafkaRDD
> --
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> {code}
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
> {code}
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> {code}
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at 

[jira] [Updated] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Summary: Serialization issue for MessageAndMetadata in KafkaRDD  (was: 
Serialization issue for KafkaRDD)

> Serialization issue for MessageAndMetadata in KafkaRDD
> --
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> {code}
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
> {code}
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> {code}
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Description: 
When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd.


Here is the min code to reproduce this issue. This example just tries to 
demonstrate the bug, not the actual code we run.
{code}
// create source stream object
val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))

// Create topic, kafkaParams, messageHandler and offsetnRanges
val topicsSet: Set[String] = "flog".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> 
KafkaCluster.MAIN.getKafkaConnString)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
KafkaCluster.MAIN,
topicsSet.toList.asJava,

kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)

// Create an DStream
val inputStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc,
kafkaParams,
topicPartitionOffsetRange,
messageHandler)

// Apply window function
inputStream.window(Seconds(slidingWindowInterval), 
Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())

ssc.start()
ssc.awaitTermination()
{code}

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
{code}
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Description: 
When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd.


Here is the min code to reproduce this issue. This example just tries to 
demonstrate the bug, not the actual code we run.
{code}
// create source stream object
val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))

// Create topic, kafkaParams, messageHandler and offsetnRanges
val topicsSet: Set[String] = "flog".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> 
KafkaCluster.MAIN.getKafkaConnString)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
KafkaCluster.MAIN,
topicsSet.toList.asJava,

kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)

// Create an DStream
val inputStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc,
kafkaParams,
topicPartitionOffsetRange,
messageHandler)

// Apply window function
inputStream.window(Seconds(slidingWindowInterval), 
Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())

ssc.start()
ssc.awaitTermination()
{code}

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at 

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211147#comment-15211147
 ] 

Liyin Tang commented on SPARK-14105:


https://github.com/apache/spark/pull/11921#issuecomment-201073631,

I have verified the issue is caused by serializing MessageAndMetadata.

Previously:
{code}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
{code}
In this way, when serializing KafkaRDD, it needs to every object in 
MessageAndMetadata, which includes the slice of the ByteBuffer. This will cause 
the block more than 2 G.

The simple workaround is
{code}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), 
mmd.message())
{code}
In this way, it has already deep-copy the message into a new byte[] during 
serializing. So the problem is avoid.

Based on this finding, I don't think we need to move forward to deep-copy 
message in the FetchResponse. Maybe we can give user a logging or warning to 
avoid serializing MessageAndMetadata directly.

Does it make sense ? If so, I can close this Jira as won't fix. The workaround 
solution is pretty straightforward. 





> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue. This example just tries to 
> demonstrate the bug, not the actual code we run.
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Description: 
When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd.


Here is the min code to reproduce this issue. This example just tries to 
demonstrate the bug, not the actual code we run.
```
// create source stream object
val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))

// Create topic, kafkaParams, messageHandler and offsetnRanges
val topicsSet: Set[String] = "flog".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> 
KafkaCluster.MAIN.getKafkaConnString)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
KafkaCluster.MAIN,
topicsSet.toList.asJava,

kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)

// Create an DStream
val inputStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc,
kafkaParams,
topicPartitionOffsetRange,
messageHandler)

// Apply window function
inputStream.window(Seconds(slidingWindowInterval), 
Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())

ssc.start()
ssc.awaitTermination()
 ```

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at 

[jira] [Issue Comment Deleted] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Comment: was deleted

(was: Copied from PR discussion:
===
That example is just for demonstrating the bug. The actual code I run is more 
than count :)
I need to convert the kafka message to a dataframe, and run spark sql on it.
```
   val functions: (RDD[MessageAndMetadata[String, String]] => Unit) 
= (rdd) => {
   val startTS = System.currentTimeMillis()
   // filter out null message, otherwise it will cause json parsing 
throw exception
   val messageRDD = rdd.map(_.message()).filter(_ != null)
   val inputDF = 
sqlContext.read.schema(sourceStreamObject.schema).json(messageRDD)
   inputDF.registerTempTable(sourceStreamObject.name)
   inputDF.cache()
   processObjects.foreach(_.run(sqlContext))
   sinkObjects.foreach(_.sink(sqlContext))
   }
```)

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> 

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210864#comment-15210864
 ] 

Liyin Tang commented on SPARK-14105:


Copied from PR discussion:
===
That example is just for demonstrating the bug. The actual code I run is more 
than count :)
I need to convert the kafka message to a dataframe, and run spark sql on it.
```
   val functions: (RDD[MessageAndMetadata[String, String]] => Unit) 
= (rdd) => {
   val startTS = System.currentTimeMillis()
   // filter out null message, otherwise it will cause json parsing 
throw exception
   val messageRDD = rdd.map(_.message()).filter(_ != null)
   val inputDF = 
sqlContext.read.schema(sourceStreamObject.schema).json(messageRDD)
   inputDF.registerTempTable(sourceStreamObject.name)
   inputDF.cache()
   processObjects.foreach(_.run(sqlContext))
   sinkObjects.foreach(_.sink(sqlContext))
   }
```

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> 

[jira] [Comment Edited] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210734#comment-15210734
 ] 

Liyin Tang edited comment on SPARK-14105 at 3/24/16 6:35 PM:
-

Add the min code example to reproduce this issue in the jira description.

There is another way to work around this issue without changing KafkaRDD code, 
which register a customized Kryo serialization implementation for Kafka 
Message. In this way, we don't have to deep copy each message. 


was (Author: liyin):
Add the min code example to reproduce this issue.

There is another way to work around this issue without changing KafkaRDD code, 
which register a customized Kryo serialization implementation for Kafka 
Message. In this way, we don't have to deep copy each message. 

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> 

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210734#comment-15210734
 ] 

Liyin Tang commented on SPARK-14105:


Add the min code example to reproduce this issue.

There is another way to work around this issue without changing KafkaRDD code, 
which register a customized Kryo serialization implementation for Kafka 
Message. In this way, we don't have to deep copy each message. 

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd.
> Here is the min code to reproduce this issue:
> ```
> // create source stream object
> val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))
> // Create topic, kafkaParams, messageHandler and offsetnRanges
> val topicsSet: Set[String] = "flog".split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> KafkaCluster.MAIN.getKafkaConnString)
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
> KafkaCluster.MAIN,
> topicsSet.toList.asJava,
> 
> kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)
> // Create an DStream
> val inputStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
> ssc,
> kafkaParams,
> topicPartitionOffsetRange,
> messageHandler)
> // Apply window function
> inputStream.window(Seconds(slidingWindowInterval), 
> Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())
> ssc.start()
> ssc.awaitTermination()
>  ```
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-24 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Description: 
When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd.


Here is the min code to reproduce this issue:
```
// create source stream object
val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds))

// Create topic, kafkaParams, messageHandler and offsetnRanges
val topicsSet: Set[String] = "flog".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> 
KafkaCluster.MAIN.getKafkaConnString)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets(
KafkaCluster.MAIN,
topicsSet.toList.asJava,

kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10)

// Create an DStream
val inputStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc,
kafkaParams,
topicPartitionOffsetRange,
messageHandler)

// Apply window function
inputStream.window(Seconds(slidingWindowInterval), 
Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count())

ssc.start()
ssc.awaitTermination()
 ```

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
   

[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209594#comment-15209594
 ] 

Liyin Tang commented on SPARK-14105:


I tested the PR with the same streaming job, and verified the serialization 
issue is fixed with the PR.

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Affects Version/s: 1.6.1

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.1
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
> at 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Attachment: Screenshot 2016-03-23 09.04.51.png

The screenshot of KafkaRDD's serialization 

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
> Attachments: Screenshot 2016-03-23 09.04.51.png
>
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Component/s: (was: Spark Core)

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at 

[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-14105:
---
Affects Version/s: 1.5.2
  Component/s: Streaming
   Spark Core

> Serialization issue for KafkaRDD
> 
>
> Key: SPARK-14105
> URL: https://issues.apache.org/jira/browse/SPARK-14105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
>
> When using DISK or Memory to persistent KafkaDirectInputStream, it will 
> serialize the FetchResponse into blocks. The FetchResponse contains the 
> ByteBufferMessageSet where each Kafka Message is just one slice of the 
> underlying ByteBuffer. 
> When serializing the KafkaRDDIterator, it seems like the entire underlying 
> ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
> message. This will cause block size easily exceeds 2G, and lead to 
> "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
> "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"
> The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
> but it will cause other errors like errRanOutBeforeEnd. 
> Here are exceptions I got for both Memory and Disk persistent.
> Memory Persistent:
> 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-9,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
> at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> Disk Persistent: 
> 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) 
> on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
> (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
> at 

[jira] [Created] (SPARK-14105) Serialization issue for KafkaRDD

2016-03-23 Thread Liyin Tang (JIRA)
Liyin Tang created SPARK-14105:
--

 Summary: Serialization issue for KafkaRDD
 Key: SPARK-14105
 URL: https://issues.apache.org/jira/browse/SPARK-14105
 Project: Spark
  Issue Type: Bug
Reporter: Liyin Tang


When using DISK or Memory to persistent KafkaDirectInputStream, it will 
serialize the FetchResponse into blocks. The FetchResponse contains the 
ByteBufferMessageSet where each Kafka Message is just one slice of the 
underlying ByteBuffer. 

When serializing the KafkaRDDIterator, it seems like the entire underlying 
ByteBuffer in ByteBufferMessageSet will be serialized for each and every 
message. This will cause block size easily exceeds 2G, and lead to 
"java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or 
"FileChannelImpl.map -> exceeds Integer.MAX_VALUE:"

The consumer fetch is the default value (1M).  I tried to reduce fetch size, 
but it will cause other errors like errRanOutBeforeEnd. 

Here are exceptions I got for both Memory and Disk persistent.
Memory Persistent:
16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-9,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

Disk Persistent: 
16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) on 
executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException 
(java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: 

[jira] [Updated] (SPARK-13580) Driver makes no progress when Executor's akka thread exits due to OOM.

2016-02-29 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-13580:
---
Summary: Driver makes no progress when Executor's akka thread exits due to 
OOM.  (was: Driver makes no progress after failed to remove broadcast on 
Executor)

> Driver makes no progress when Executor's akka thread exits due to OOM.
> --
>
> Key: SPARK-13580
> URL: https://issues.apache.org/jira/browse/SPARK-13580
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
> Attachments: driver_jstack.txt, driver_log.txt, executor_jstack, 
> stderrfiltered.txt.gz
>
>
> From Driver's log: it failed to remove broadcast data due to RPC timeout 
> exception from executor #11. And it also failed to get thread dump from 
> executor #11 due to akka.actor.ActorNotFound exception.
> After that, driver waited for executor #11 to finish one task for that job. 
> All the other tasks are finished for that job.
> However, from the executor#11's log, it didn't get that task (it got 9 other 
> tasks and finished them) 
> Since then, there is no progress in the streaming job. 
> I have attached the driver's log and jstack, executor's jstack. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13580) Driver makes no progress after failed to remove broadcast on Executor

2016-02-29 Thread Liyin Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15172810#comment-15172810
 ] 

Liyin Tang commented on SPARK-13580:


Thanks [~zsxwing] for the investigation! That's very helpful !

> Driver makes no progress after failed to remove broadcast on Executor
> -
>
> Key: SPARK-13580
> URL: https://issues.apache.org/jira/browse/SPARK-13580
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
> Attachments: driver_jstack.txt, driver_log.txt, executor_jstack, 
> stderrfiltered.txt.gz
>
>
> From Driver's log: it failed to remove broadcast data due to RPC timeout 
> exception from executor #11. And it also failed to get thread dump from 
> executor #11 due to akka.actor.ActorNotFound exception.
> After that, driver waited for executor #11 to finish one task for that job. 
> All the other tasks are finished for that job.
> However, from the executor#11's log, it didn't get that task (it got 9 other 
> tasks and finished them) 
> Since then, there is no progress in the streaming job. 
> I have attached the driver's log and jstack, executor's jstack. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13580) Driver makes no progress after failed to remove broadcast on Executor

2016-02-29 Thread Liyin Tang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liyin Tang updated SPARK-13580:
---
Attachment: executor_jstack
driver_log.txt
driver_jstack.txt

> Driver makes no progress after failed to remove broadcast on Executor
> -
>
> Key: SPARK-13580
> URL: https://issues.apache.org/jira/browse/SPARK-13580
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Liyin Tang
> Attachments: driver_jstack.txt, driver_log.txt, executor_jstack
>
>
> From Driver's log: it failed to remove broadcast data due to RPC timeout 
> exception from executor #11. And it also failed to get thread dump from 
> executor #11 due to akka.actor.ActorNotFound exception.
> After that, driver waited for executor #11 to finish one task for that job. 
> All the other tasks are finished for that job.
> However, from the executor#11's log, it didn't get that task (it got 9 other 
> tasks and finished them) 
> Since then, there is no progress in the streaming job. 
> I have attached the driver's log and jstack, executor's jstack. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13580) Driver makes no progress after failed to remove broadcast on Executor

2016-02-29 Thread Liyin Tang (JIRA)
Liyin Tang created SPARK-13580:
--

 Summary: Driver makes no progress after failed to remove broadcast 
on Executor
 Key: SPARK-13580
 URL: https://issues.apache.org/jira/browse/SPARK-13580
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.2
Reporter: Liyin Tang


>From Driver's log: it failed to remove broadcast data due to RPC timeout 
>exception from executor #11. And it also failed to get thread dump from 
>executor #11 due to akka.actor.ActorNotFound exception.

After that, driver waited for executor #11 to finish one task for that job. All 
the other tasks are finished for that job.

However, from the executor#11's log, it didn't get that task (it got 9 other 
tasks and finished them) 

Since then, there is no progress in the streaming job. 

I have attached the driver's log and jstack, executor's jstack. 








--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org