[jira] [Updated] (SPARK-16244) Failed job/stage couldn't stop JobGenerator immediately.
[ https://issues.apache.org/jira/browse/SPARK-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-16244: --- Description: This streaming job has a very simple DAG. Each batch have only 1 job, and each job has only 1 stage. Based on the following logs, we observed a potential race condition. Stage 1 failed due to some tasks failure, and it tigers failJobAndIndependentStages. In the meanwhile, the next stage (job), 2, is submitted and was able to successfully run a few tasks before stopping JobGenerator via shutdown hook. Since the next job was able to run through a few tasks successfully, it just messed up all the checkpoints / offset management. Here is the log from my job: {color:red} Stage 227 started: {color} [INFO] 2016-06-25 18:59:00,171 org.apache.spark.scheduler.DAGScheduler logInfo - Submitting 1495 missing tasks from ResultStage 227 (MapPartitionsRDD[455] at foreachRDD at DBExportStreaming.java:55) [INFO] 2016-06-25 18:59:00,160 org.apache.spark.scheduler.DAGScheduler logInfo - Final stage: ResultStage 227(foreachRDD at DBExportStreaming.java:55) [INFO] 2016-06-25 18:59:00,160 org.apache.spark.scheduler.DAGScheduler logInfo - Submitting ResultStage 227 (MapPartitionsRDD[455] at foreachRDD at DBExportStreaming.java:55), which has no missing parents [INFO] 2016-06-25 18:59:00,171 org.apache.spark.scheduler.DAGScheduler logInfo - Submitting 1495 missing tasks from ResultStage 227 (MapPartitionsRDD[455] at foreachRDD at DBExportStreaming.java:55) {color:red} Stage 227 failed: {color} [ERROR] 2016-06-25 19:01:34,083 org.apache.spark.scheduler.TaskSetManager logError - Task 26 in stage 227.0 failed 4 times; aborting job [INFO] 2016-06-25 19:01:34,086 org.apache.spark.scheduler.cluster.YarnScheduler logInfo - Cancelling stage 227 [INFO] 2016-06-25 19:01:34,088 org.apache.spark.scheduler.cluster.YarnScheduler logInfo - Stage 227 was cancelled [INFO] 2016-06-25 19:01:34,089 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 227 (foreachRDD at DBExportStreaming.java:55) failed in 153.914 s [INFO] 2016-06-25 19:01:34,090 org.apache.spark.scheduler.DAGScheduler logInfo - Job 227 failed: foreachRDD at DBExportStreaming.java:55, took 153.930462 s [INFO] 2016-06-25 19:01:34,091 org.apache.spark.streaming.scheduler.JobScheduler logInfo - Finished job streaming job 146688114 ms.0 from job set of time 14 6688114 ms [INFO] 2016-06-25 19:01:34,091 org.apache.spark.streaming.scheduler.JobScheduler logInfo - Total delay: 154.091 s for time 146688114 ms (execution: 153.935 s) {color:red} Stage 228 started: {color} [INFO] 2016-06-25 19:01:34,094 org.apache.spark.SparkContext logInfo - Starting job: foreachRDD at DBExportStreaming.java:55 [INFO] 2016-06-25 19:01:34,095 org.apache.spark.scheduler.DAGScheduler logInfo - Got job 228 (foreachRDD at DBExportStreaming.java:55) with 1495 output partitions [INFO] 2016-06-25 19:01:34,095 org.apache.spark.scheduler.DAGScheduler logInfo - Final stage: ResultStage 228(foreachRDD at DBExportStreaming.java:55) Exception in thread "main" [INFO] 2016-06-25 19:01:34,095 org.apache.spark.scheduler.DAGScheduler logInfo - Parents of final stage: List() {color:red} Shutdown hook was called after stage 228 started: {color} [INFO] 2016-06-25 19:01:34,099 org.apache.spark.streaming.StreamingContext logInfo - Invoking stop(stopGracefully=false) from shutdown hook [INFO] 2016-06-25 19:01:34,101 org.apache.spark.streaming.scheduler.JobGenerator logInfo - Stopping JobGenerator immediately [INFO] 2016-06-25 19:01:34,102 org.apache.spark.streaming.util.RecurringTimer logInfo - Stopped timer for JobGenerator after time 146688126 [INFO] 2016-06-25 19:01:34,103 org.apache.spark.streaming.scheduler.JobGenerator logInfo - Stopped JobGenerator [INFO] 2016-06-25 19:01:34,106 org.apache.spark.storage.MemoryStore logInfo - ensureFreeSpace(133720) called with curMem=344903, maxMem=1159641169 [INFO] 2016-06-25 19:01:34,106 org.apache.spark.storage.MemoryStore logInfo - Block broadcast_229 stored as values in memory (estimated size 130.6 KB, free 1105.5 MB) [INFO] 2016-06-25 19:01:34,107 org.apache.spark.storage.MemoryStore logInfo - ensureFreeSpace(51478) called with curMem=478623, maxMem=1159641169 [INFO] 2016-06-25 19:01:34,107 org.apache.spark.storage.MemoryStore logInfo - Block broadcast_229_piece0 stored as bytes in memory (estimated size 50.3 KB, free 1105.4 MB) [INFO] 2016-06-25 19:01:34,108 org.apache.spark.storage.BlockManagerInfo logInfo - Added broadcast_229_piece0 in memory on 10.123.209.8:42154 (size: 50.3 KB, free: 1105.8 MB) [INFO] 2016-06-25 19:01:34,109 org.apache.spark.SparkContext logInfo - Created broadcast 229 from broadcast at DAGScheduler.scala:861 [INFO] 2016-06-25 19:01:34,110 org.apache.spark.scheduler.DAGScheduler logInfo - Submitting 1495 missing tasks from ResultStage 228 (MapPartitionsRDD[458] at foreachRDD at
[jira] [Created] (SPARK-16244) Failed job/stage couldn't stop JobGenerator immediately.
Liyin Tang created SPARK-16244: -- Summary: Failed job/stage couldn't stop JobGenerator immediately. Key: SPARK-16244 URL: https://issues.apache.org/jira/browse/SPARK-16244 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.5.2 Reporter: Liyin Tang This streaming job has a very simple DAG. Each batch have only 1 job, and each job has only 1 stage. Based on the following logs, we observed a potential race condition. Stage 1 failed due to some tasks failure, and it tigers failJobAndIndependentStages. In the meanwhile, the next stage (job), 2, is submitted and was able to successfully run a few tasks before stopping JobGenerator via shutdown hook. Since the next job was able to run through a few tasks successfully, it just messed up all the checkpoints / offset management. I will attach the log in the jira as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14230) Config the start time (jitter) for streaming jobs
[ https://issues.apache.org/jira/browse/SPARK-14230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218672#comment-15218672 ] Liyin Tang commented on SPARK-14230: [~davies], thanks for the response. If I understand it correctly, this PR you pointed is to set a start time for window function. What about non-window function, do we also have a way to specify the start time ? > Config the start time (jitter) for streaming jobs > - > > Key: SPARK-14230 > URL: https://issues.apache.org/jira/browse/SPARK-14230 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Liyin Tang > > Currently, RecurringTimer will normalize the start time. For instance, if > batch duration is 1 min, all the job will start exactly at 1 min boundary. > This actually adds some burden to the streaming source. Assuming the source > is Kafka, and there is a list of streaming jobs with 1 min batch duration, > then at first few seconds of each min, high network traffic will be observed > in Kafka. This makes Kafka capacity planning tricky. > It will be great to have an option in the streaming context to set the job > start time. In this way, user can add a jitter for the start time for each, > and make Kafka fetch_request much smooth across the duration window. > {code} > class RecurringTimer { > def getStartTime(): Long = { > (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + > jitter > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14230) Config the start time (jitter) for streaming jobs
Liyin Tang created SPARK-14230: -- Summary: Config the start time (jitter) for streaming jobs Key: SPARK-14230 URL: https://issues.apache.org/jira/browse/SPARK-14230 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Liyin Tang Currently, RecurringTimer will normalize the start time. For instance, if batch duration is 1 min, all the job will start exactly at 1 min boundary. This actually adds some burden to the streaming source. Assuming the source is Kafka, and there is a list of streaming jobs with 1 min batch duration, then at first few seconds of each min, high network traffic will be observed in Kafka. This makes Kafka capacity planning tricky. It will be great to have an option in the streaming context to set the job start time. In this way, user can add a jitter for the start time for each, and make Kafka fetch_request much smooth across the duration window. {code} class RecurringTimer { def getStartTime(): Long = { (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + jitter } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang closed SPARK-14105. -- Resolution: Won't Fix Avoid serializing MessageAndMetadata. Easy workaround: {code} val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) {code} > Serialization issue for MessageAndMetadata in KafkaRDD > -- > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > {code} > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > {code} > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > {code} > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at >
[jira] [Issue Comment Deleted] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Comment: was deleted (was: Avoid serializing MessageAndMetadata. Easy workaround: {code} val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) {code}) > Serialization issue for MessageAndMetadata in KafkaRDD > -- > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > {code} > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > {code} > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > {code} > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at >
[jira] [Commented] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211999#comment-15211999 ] Liyin Tang commented on SPARK-14105: Thanks [~c...@koeninger.org] and [~sowen] for the review and comments ! It has been great start experience to contribute to the community. For this jira,It is not ideal to do the deep copy from the fetch response directly. And users can easily avoid serializing MessageAndMetadata directly. I think it makes sense to close it as won't-fix > Serialization issue for MessageAndMetadata in KafkaRDD > -- > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > {code} > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > {code} > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > {code} > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at
[jira] [Updated] (SPARK-14105) Serialization issue for MessageAndMetadata in KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Summary: Serialization issue for MessageAndMetadata in KafkaRDD (was: Serialization issue for KafkaRDD) > Serialization issue for MessageAndMetadata in KafkaRDD > -- > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > {code} > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > {code} > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > {code} > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Description: When using DISK or Memory to persistent KafkaDirectInputStream, it will serialize the FetchResponse into blocks. The FetchResponse contains the ByteBufferMessageSet where each Kafka Message is just one slice of the underlying ByteBuffer. When serializing the KafkaRDDIterator, it seems like the entire underlying ByteBuffer in ByteBufferMessageSet will be serialized for each and every message. This will cause block size easily exceeds 2G, and lead to "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" The consumer fetch is the default value (1M). I tried to reduce fetch size, but it will cause other errors like errRanOutBeforeEnd. Here is the min code to reproduce this issue. This example just tries to demonstrate the bug, not the actual code we run. {code} // create source stream object val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) // Create topic, kafkaParams, messageHandler and offsetnRanges val topicsSet: Set[String] = "flog".split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> KafkaCluster.MAIN.getKafkaConnString) val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( KafkaCluster.MAIN, topicsSet.toList.asJava, kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) // Create an DStream val inputStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( ssc, kafkaParams, topicPartitionOffsetRange, messageHandler) // Apply window function inputStream.window(Seconds(slidingWindowInterval), Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) ssc.start() ssc.awaitTermination() {code} Here are exceptions I got for both Memory and Disk persistent. Memory Persistent: {code} 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-9,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Description: When using DISK or Memory to persistent KafkaDirectInputStream, it will serialize the FetchResponse into blocks. The FetchResponse contains the ByteBufferMessageSet where each Kafka Message is just one slice of the underlying ByteBuffer. When serializing the KafkaRDDIterator, it seems like the entire underlying ByteBuffer in ByteBufferMessageSet will be serialized for each and every message. This will cause block size easily exceeds 2G, and lead to "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" The consumer fetch is the default value (1M). I tried to reduce fetch size, but it will cause other errors like errRanOutBeforeEnd. Here is the min code to reproduce this issue. This example just tries to demonstrate the bug, not the actual code we run. {code} // create source stream object val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) // Create topic, kafkaParams, messageHandler and offsetnRanges val topicsSet: Set[String] = "flog".split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> KafkaCluster.MAIN.getKafkaConnString) val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( KafkaCluster.MAIN, topicsSet.toList.asJava, kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) // Create an DStream val inputStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( ssc, kafkaParams, topicPartitionOffsetRange, messageHandler) // Apply window function inputStream.window(Seconds(slidingWindowInterval), Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) ssc.start() ssc.awaitTermination() {code} Here are exceptions I got for both Memory and Disk persistent. Memory Persistent: 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-9,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211147#comment-15211147 ] Liyin Tang commented on SPARK-14105: https://github.com/apache/spark/pull/11921#issuecomment-201073631, I have verified the issue is caused by serializing MessageAndMetadata. Previously: {code} val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd {code} In this way, when serializing KafkaRDD, it needs to every object in MessageAndMetadata, which includes the slice of the ByteBuffer. This will cause the block more than 2 G. The simple workaround is {code} val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) {code} In this way, it has already deep-copy the message into a new byte[] during serializing. So the problem is avoid. Based on this finding, I don't think we need to move forward to deep-copy message in the FetchResponse. Maybe we can give user a logging or warning to avoid serializing MessageAndMetadata directly. Does it make sense ? If so, I can close this Jira as won't fix. The workaround solution is pretty straightforward. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue. This example just tries to > demonstrate the bug, not the actual code we run. > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Description: When using DISK or Memory to persistent KafkaDirectInputStream, it will serialize the FetchResponse into blocks. The FetchResponse contains the ByteBufferMessageSet where each Kafka Message is just one slice of the underlying ByteBuffer. When serializing the KafkaRDDIterator, it seems like the entire underlying ByteBuffer in ByteBufferMessageSet will be serialized for each and every message. This will cause block size easily exceeds 2G, and lead to "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" The consumer fetch is the default value (1M). I tried to reduce fetch size, but it will cause other errors like errRanOutBeforeEnd. Here is the min code to reproduce this issue. This example just tries to demonstrate the bug, not the actual code we run. ``` // create source stream object val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) // Create topic, kafkaParams, messageHandler and offsetnRanges val topicsSet: Set[String] = "flog".split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> KafkaCluster.MAIN.getKafkaConnString) val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( KafkaCluster.MAIN, topicsSet.toList.asJava, kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) // Create an DStream val inputStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( ssc, kafkaParams, topicPartitionOffsetRange, messageHandler) // Apply window function inputStream.window(Seconds(slidingWindowInterval), Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) ssc.start() ssc.awaitTermination() ``` Here are exceptions I got for both Memory and Disk persistent. Memory Persistent: 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-9,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at
[jira] [Issue Comment Deleted] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Comment: was deleted (was: Copied from PR discussion: === That example is just for demonstrating the bug. The actual code I run is more than count :) I need to convert the kafka message to a dataframe, and run spark sql on it. ``` val functions: (RDD[MessageAndMetadata[String, String]] => Unit) = (rdd) => { val startTS = System.currentTimeMillis() // filter out null message, otherwise it will cause json parsing throw exception val messageRDD = rdd.map(_.message()).filter(_ != null) val inputDF = sqlContext.read.schema(sourceStreamObject.schema).json(messageRDD) inputDF.registerTempTable(sourceStreamObject.name) inputDF.cache() processObjects.foreach(_.run(sqlContext)) sinkObjects.foreach(_.sink(sqlContext)) } ```) > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at >
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210864#comment-15210864 ] Liyin Tang commented on SPARK-14105: Copied from PR discussion: === That example is just for demonstrating the bug. The actual code I run is more than count :) I need to convert the kafka message to a dataframe, and run spark sql on it. ``` val functions: (RDD[MessageAndMetadata[String, String]] => Unit) = (rdd) => { val startTS = System.currentTimeMillis() // filter out null message, otherwise it will cause json parsing throw exception val messageRDD = rdd.map(_.message()).filter(_ != null) val inputDF = sqlContext.read.schema(sourceStreamObject.schema).json(messageRDD) inputDF.registerTempTable(sourceStreamObject.name) inputDF.cache() processObjects.foreach(_.run(sqlContext)) sinkObjects.foreach(_.sink(sqlContext)) } ``` > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at >
[jira] [Comment Edited] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210734#comment-15210734 ] Liyin Tang edited comment on SPARK-14105 at 3/24/16 6:35 PM: - Add the min code example to reproduce this issue in the jira description. There is another way to work around this issue without changing KafkaRDD code, which register a customized Kryo serialization implementation for Kafka Message. In this way, we don't have to deep copy each message. was (Author: liyin): Add the min code example to reproduce this issue. There is another way to work around this issue without changing KafkaRDD code, which register a customized Kryo serialization implementation for Kafka Message. In this way, we don't have to deep copy each message. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at >
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210734#comment-15210734 ] Liyin Tang commented on SPARK-14105: Add the min code example to reproduce this issue. There is another way to work around this issue without changing KafkaRDD code, which register a customized Kryo serialization implementation for Kafka Message. In this way, we don't have to deep copy each message. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here is the min code to reproduce this issue: > ``` > // create source stream object > val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) > // Create topic, kafkaParams, messageHandler and offsetnRanges > val topicsSet: Set[String] = "flog".split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > KafkaCluster.MAIN.getKafkaConnString) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( > KafkaCluster.MAIN, > topicsSet.toList.asJava, > > kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) > // Create an DStream > val inputStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( > ssc, > kafkaParams, > topicPartitionOffsetRange, > messageHandler) > // Apply window function > inputStream.window(Seconds(slidingWindowInterval), > Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) > ssc.start() > ssc.awaitTermination() > ``` > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at >
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Description: When using DISK or Memory to persistent KafkaDirectInputStream, it will serialize the FetchResponse into blocks. The FetchResponse contains the ByteBufferMessageSet where each Kafka Message is just one slice of the underlying ByteBuffer. When serializing the KafkaRDDIterator, it seems like the entire underlying ByteBuffer in ByteBufferMessageSet will be serialized for each and every message. This will cause block size easily exceeds 2G, and lead to "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" The consumer fetch is the default value (1M). I tried to reduce fetch size, but it will cause other errors like errRanOutBeforeEnd. Here is the min code to reproduce this issue: ``` // create source stream object val ssc = new StreamingContext(sparkConf, Seconds(intervalSeconds)) // Create topic, kafkaParams, messageHandler and offsetnRanges val topicsSet: Set[String] = "flog".split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> KafkaCluster.MAIN.getKafkaConnString) val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd val topicPartitionOffsetRange = KafkaOffsetsUtil.getKafkaOffsets( KafkaCluster.MAIN, topicsSet.toList.asJava, kafka.api.OffsetRequest.LatestTime).toMap.mapValues(Long2long).take(10) // Create an DStream val inputStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]]( ssc, kafkaParams, topicPartitionOffsetRange, messageHandler) // Apply window function inputStream.window(Seconds(slidingWindowInterval), Seconds(intervalSeconds)).foreachRDD(rdd => rdd.count()) ssc.start() ssc.awaitTermination() ``` Here are exceptions I got for both Memory and Disk persistent. Memory Persistent: 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-9,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
[jira] [Commented] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209594#comment-15209594 ] Liyin Tang commented on SPARK-14105: I tested the PR with the same streaming job, and verified the serialization issue is fixed with the PR. > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Affects Version/s: 1.6.1 > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.1 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) > at
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Attachment: Screenshot 2016-03-23 09.04.51.png The screenshot of KafkaRDD's serialization > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > Attachments: Screenshot 2016-03-23 09.04.51.png > > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Component/s: (was: Spark Core) > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at
[jira] [Updated] (SPARK-14105) Serialization issue for KafkaRDD
[ https://issues.apache.org/jira/browse/SPARK-14105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-14105: --- Affects Version/s: 1.5.2 Component/s: Streaming Spark Core > Serialization issue for KafkaRDD > > > Key: SPARK-14105 > URL: https://issues.apache.org/jira/browse/SPARK-14105 > Project: Spark > Issue Type: Bug > Components: Spark Core, Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > > When using DISK or Memory to persistent KafkaDirectInputStream, it will > serialize the FetchResponse into blocks. The FetchResponse contains the > ByteBufferMessageSet where each Kafka Message is just one slice of the > underlying ByteBuffer. > When serializing the KafkaRDDIterator, it seems like the entire underlying > ByteBuffer in ByteBufferMessageSet will be serialized for each and every > message. This will cause block size easily exceeds 2G, and lead to > "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or > "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" > The consumer fetch is the default value (1M). I tried to reduce fetch size, > but it will cause other errors like errRanOutBeforeEnd. > Here are exceptions I got for both Memory and Disk persistent. > Memory Persistent: > 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-9,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > Disk Persistent: > 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) > on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException > (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) > at
[jira] [Created] (SPARK-14105) Serialization issue for KafkaRDD
Liyin Tang created SPARK-14105: -- Summary: Serialization issue for KafkaRDD Key: SPARK-14105 URL: https://issues.apache.org/jira/browse/SPARK-14105 Project: Spark Issue Type: Bug Reporter: Liyin Tang When using DISK or Memory to persistent KafkaDirectInputStream, it will serialize the FetchResponse into blocks. The FetchResponse contains the ByteBufferMessageSet where each Kafka Message is just one slice of the underlying ByteBuffer. When serializing the KafkaRDDIterator, it seems like the entire underlying ByteBuffer in ByteBufferMessageSet will be serialized for each and every message. This will cause block size easily exceeds 2G, and lead to "java.lang.OutOfMemoryError: Requested array size exceeds VM limit" or "FileChannelImpl.map -> exceeds Integer.MAX_VALUE:" The consumer fetch is the default value (1M). I tried to reduce fetch size, but it will cause other errors like errRanOutBeforeEnd. Here are exceptions I got for both Memory and Disk persistent. Memory Persistent: 16/03/23 15:34:44 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-9,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) Disk Persistent: 16/03/23 17:04:00 INFO TaskSetManager: Lost task 42.1 in stage 2.0 (TID 974) on executor i-2878ceb3.inst.aws.airbnb.com: java.lang.RuntimeException (java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail:
[jira] [Updated] (SPARK-13580) Driver makes no progress when Executor's akka thread exits due to OOM.
[ https://issues.apache.org/jira/browse/SPARK-13580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-13580: --- Summary: Driver makes no progress when Executor's akka thread exits due to OOM. (was: Driver makes no progress after failed to remove broadcast on Executor) > Driver makes no progress when Executor's akka thread exits due to OOM. > -- > > Key: SPARK-13580 > URL: https://issues.apache.org/jira/browse/SPARK-13580 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > Attachments: driver_jstack.txt, driver_log.txt, executor_jstack, > stderrfiltered.txt.gz > > > From Driver's log: it failed to remove broadcast data due to RPC timeout > exception from executor #11. And it also failed to get thread dump from > executor #11 due to akka.actor.ActorNotFound exception. > After that, driver waited for executor #11 to finish one task for that job. > All the other tasks are finished for that job. > However, from the executor#11's log, it didn't get that task (it got 9 other > tasks and finished them) > Since then, there is no progress in the streaming job. > I have attached the driver's log and jstack, executor's jstack. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13580) Driver makes no progress after failed to remove broadcast on Executor
[ https://issues.apache.org/jira/browse/SPARK-13580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15172810#comment-15172810 ] Liyin Tang commented on SPARK-13580: Thanks [~zsxwing] for the investigation! That's very helpful ! > Driver makes no progress after failed to remove broadcast on Executor > - > > Key: SPARK-13580 > URL: https://issues.apache.org/jira/browse/SPARK-13580 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > Attachments: driver_jstack.txt, driver_log.txt, executor_jstack, > stderrfiltered.txt.gz > > > From Driver's log: it failed to remove broadcast data due to RPC timeout > exception from executor #11. And it also failed to get thread dump from > executor #11 due to akka.actor.ActorNotFound exception. > After that, driver waited for executor #11 to finish one task for that job. > All the other tasks are finished for that job. > However, from the executor#11's log, it didn't get that task (it got 9 other > tasks and finished them) > Since then, there is no progress in the streaming job. > I have attached the driver's log and jstack, executor's jstack. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13580) Driver makes no progress after failed to remove broadcast on Executor
[ https://issues.apache.org/jira/browse/SPARK-13580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liyin Tang updated SPARK-13580: --- Attachment: executor_jstack driver_log.txt driver_jstack.txt > Driver makes no progress after failed to remove broadcast on Executor > - > > Key: SPARK-13580 > URL: https://issues.apache.org/jira/browse/SPARK-13580 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Liyin Tang > Attachments: driver_jstack.txt, driver_log.txt, executor_jstack > > > From Driver's log: it failed to remove broadcast data due to RPC timeout > exception from executor #11. And it also failed to get thread dump from > executor #11 due to akka.actor.ActorNotFound exception. > After that, driver waited for executor #11 to finish one task for that job. > All the other tasks are finished for that job. > However, from the executor#11's log, it didn't get that task (it got 9 other > tasks and finished them) > Since then, there is no progress in the streaming job. > I have attached the driver's log and jstack, executor's jstack. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13580) Driver makes no progress after failed to remove broadcast on Executor
Liyin Tang created SPARK-13580: -- Summary: Driver makes no progress after failed to remove broadcast on Executor Key: SPARK-13580 URL: https://issues.apache.org/jira/browse/SPARK-13580 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.5.2 Reporter: Liyin Tang >From Driver's log: it failed to remove broadcast data due to RPC timeout >exception from executor #11. And it also failed to get thread dump from >executor #11 due to akka.actor.ActorNotFound exception. After that, driver waited for executor #11 to finish one task for that job. All the other tasks are finished for that job. However, from the executor#11's log, it didn't get that task (it got 9 other tasks and finished them) Since then, there is no progress in the streaming job. I have attached the driver's log and jstack, executor's jstack. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org