Re: Spark driver using Spark Streaming shows increasing memory/CPU usage
[Apologies if the end of the last email was only included as an attachment - MacMail seems to do that with the rest of the message if an attachment appears inline. I‘m sending again for clarity.] Hi Tathagata, Thanks for your quick reply! I’ll add some more detail below about what I’m doing - I’ve tried a lot of variations on the code to debug this, with monitoring enabled, but I didn’t want to overwhelm the issue description to start with ;-) On 30 Jun 2015, at 19:30, Tathagata Das mailto:t...@databricks.com>> wrote: Could you give more information on the operations that you are using? The code outline? And what do you mean by "Spark Driver receiver events"? If the driver is receiving events, how is it being sent to the executors. The events are just objects that represent actions a user takes. They contain a user id, a type and some other info, and get dumped into a MongoDB and then picked out by the Receiver. This Receiver runs a thread which periodically polls the db, processes new events into DBObjects and calls Receiver.store() to hand each one off to an Executor. BTW, for memory usages, I strongly recommend using jmap --histo:live to see what are the type of objects that is causing most memory usage? I’ve been running both jconsole and VisualVM to monitor the processes, and when memory usage is high it is overwhelmingly due to byte arrays. I’ve read that sometimes performing operations like sorting an RDD can lead to unreachable byte arrays (https://spark-project.atlassian.net/browse/SPARK-1001). I’ve not come across any reports that quite match our use case though. The groupByKey step seems to be a significant creator of byte arrays in my case. I’ll attach an outline of the code I’m using - I’ve tried to reduce this to the essentials; it won’t compile but should display ok in an IDE. A note that our MongoDBReceiver class uses StorageLevel.MEMORY_AND_DISK_SER() which will "spill partitions that don't fit in memory to disk… as serialized Java objects (one byte array per partition)”. I wondered if this might be a contributor to the problem, but our partitions are very small. Perhaps the partitions are not getting cleared up for some reason. Thanks again for taking this up. Spark Streaming has been very useful for us! Neil SparkDriverOutline.java Description: SparkDriverOutline.java TDOn Tue, Jun 30, 2015 at 9:48 AM, easyonthemayo <neil.m...@velocityww.com> wrote:I have a Spark program which exhibits increasing resource usage. Spark Streaming (https://spark.apache.org/streaming/) is used to provide the data source. The Spark Driver class receives "events" by querying a MongoDB in a custom JavaReceiverInputDStream. These events are then transformed via mapToPair(), which creates tuples mapping an id to each event. The stream is partitioned and we run a groupByKey(). Finally the events are processed by foreachRDD(). Running it for several hours on a standalone cluster, a clear trend emerges of both CPU and heap memory usage increasing. This occurs even if the data source offers no events, so there is no actual processing to perform. Similarly, omitting the bulk of processing code within foreachRDD() does not eliminate the problem. I've tried eliminating steps in the process to identify the culprit, and it looks like it's the partitioning step that prompts the CPU usage to increase over time. Has anyone else experienced this sort of behaviour? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-using-Spark-Streaming-shows-increasing-memory-CPU-usage-tp23545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark driver using Spark Streaming shows increasing memory/CPU usage
Hi Tathagata, Thanks for your quick reply! I’ll add some more detail below about what I’m doing - I’ve tried a lot of variations on the code to debug this, with monitoring enabled, but I didn’t want to overwhelm the issue description to start with ;-) On 30 Jun 2015, at 19:30, Tathagata Das mailto:t...@databricks.com>> wrote: Could you give more information on the operations that you are using? The code outline? And what do you mean by "Spark Driver receiver events"? If the driver is receiving events, how is it being sent to the executors. The events are just objects that represent actions a user takes. They contain a user id, a type and some other info, and get dumped into a MongoDB and then picked out by the Receiver. This Receiver runs a thread which periodically polls the db, processes new events into DBObjects and calls Receiver.store() to hand each one off to an Executor. BTW, for memory usages, I strongly recommend using jmap --histo:live to see what are the type of objects that is causing most memory usage? I’ve been running both jconsole and VisualVM to monitor the processes, and when memory usage is high it is overwhelmingly due to byte arrays. I’ve read that sometimes performing operations like sorting an RDD can lead to unreachable byte arrays (https://spark-project.atlassian.net/browse/SPARK-1001). I’ve not come across any reports that quite match our use case though. The groupByKey step seems to be a significant creator of byte arrays in my case. I’ll attach an outline of the code I’m using - I’ve tried to reduce this to the essentials; it won’t compile but should display ok in an IDE. SparkDriverOutline.java Description: SparkDriverOutline.java A note that our MongoDBReceiver class uses StorageLevel.MEMORY_AND_DISK_SER() which will "spill partitions that don't fit in memory to disk… as serialized Java objects (one byte array per partition)”. I wondered if this might be a contributor to the problem, but our partitions are very small. Perhaps the partitions are not getting cleared up for some reason.Thanks again for taking this up. Spark Streaming has been very useful for us!NeilTDOn Tue, Jun 30, 2015 at 9:48 AM, easyonthemayo <neil.m...@velocityww.com> wrote:I have a Spark program which exhibits increasing resource usage. Spark Streaming (https://spark.apache.org/streaming/) is used to provide the data source. The Spark Driver class receives "events" by querying a MongoDB in a custom JavaReceiverInputDStream. These events are then transformed via mapToPair(), which creates tuples mapping an id to each event. The stream is partitioned and we run a groupByKey(). Finally the events are processed by foreachRDD(). Running it for several hours on a standalone cluster, a clear trend emerges of both CPU and heap memory usage increasing. This occurs even if the data source offers no events, so there is no actual processing to perform. Similarly, omitting the bulk of processing code within foreachRDD() does not eliminate the problem. I've tried eliminating steps in the process to identify the culprit, and it looks like it's the partitioning step that prompts the CPU usage to increase over time. Has anyone else experienced this sort of behaviour? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-using-Spark-Streaming-shows-increasing-memory-CPU-usage-tp23545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark driver using Spark Streaming shows increasing memory/CPU usage
Could you give more information on the operations that you are using? The code outline? And what do you mean by "Spark Driver receiver events"? If the driver is receiving events, how is it being sent to the executors. BTW, for memory usages, I strongly recommend using jmap --histo:live to see what are the type of objects that is causing most memory usage? TD On Tue, Jun 30, 2015 at 9:48 AM, easyonthemayo wrote: > I have a Spark program which exhibits increasing resource usage. Spark > Streaming (https://spark.apache.org/streaming/) is used to provide the > data > source. The Spark Driver class receives "events" by querying a MongoDB in a > custom JavaReceiverInputDStream. These events are then transformed via > mapToPair(), which creates tuples mapping an id to each event. The stream > is > partitioned and we run a groupByKey(). Finally the events are processed by > foreachRDD(). > > Running it for several hours on a standalone cluster, a clear trend emerges > of both CPU and heap memory usage increasing. This occurs even if the data > source offers no events, so there is no actual processing to perform. > Similarly, omitting the bulk of processing code within foreachRDD() does > not > eliminate the problem. > > I've tried eliminating steps in the process to identify the culprit, and it > looks like it's the partitioning step that prompts the CPU usage to > increase > over time. > > Has anyone else experienced this sort of behaviour? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-using-Spark-Streaming-shows-increasing-memory-CPU-usage-tp23545.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Spark driver using Spark Streaming shows increasing memory/CPU usage
I have a Spark program which exhibits increasing resource usage. Spark Streaming (https://spark.apache.org/streaming/) is used to provide the data source. The Spark Driver class receives "events" by querying a MongoDB in a custom JavaReceiverInputDStream. These events are then transformed via mapToPair(), which creates tuples mapping an id to each event. The stream is partitioned and we run a groupByKey(). Finally the events are processed by foreachRDD(). Running it for several hours on a standalone cluster, a clear trend emerges of both CPU and heap memory usage increasing. This occurs even if the data source offers no events, so there is no actual processing to perform. Similarly, omitting the bulk of processing code within foreachRDD() does not eliminate the problem. I've tried eliminating steps in the process to identify the culprit, and it looks like it's the partitioning step that prompts the CPU usage to increase over time. Has anyone else experienced this sort of behaviour? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-using-Spark-Streaming-shows-increasing-memory-CPU-usage-tp23545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org