Re: Stages with non-arithmetic numbering Timing metrics in event logs
That is not exactly correct -- that being said I'm not 100% on these details either so I'd appreciate you double checking and / or another dev confirming my description. Spark actually has more threads going then the numCores you specify. numCores is really used for how many threads are actively executing tasks. There are more threads for doing the fetching (the details of which I'm not that familiar with) -- that never cuts into the number of actively executing tasks for numCores. There isn't a 1-to-1 correspondence between one shuffle block and one task -- a shuffle-read task most likely needs to fetch many shuffle blocks, with some local and some remote (in most cases). So, from your lingo above, c is numCores, but c_1 is just an independent pool of threads. this obvious follow up question is, if you've actually more than numCores threads going at the same time, how come cpu usage is low? You could still have your cpus stuck waiting on i/o, from disk or network, so they aren't getting fully utilized. And the cpu can also be idle waiting for memory, if there are a lot of cache misses (I'm not sure how that will show up in cpu monitoring). If that were the case, that could even be from too many threads, as a lot of time is spent context switching ... but I'm just guessing now. hope this helps, Imran On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes 91m...@gmail.com wrote: Hi Imran, Thank you again for your email. I just want to ask one further question to clarify the implementation of the shuffle block fetches. When you say that rather than sitting idle, [the executor] will immediately start reading the local block, I would guess that, in implementation, the executor is going to launch concurrent threads to read both local and remote blocks, which it seems to do in the initialize() method of core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case or would the Executor run all local fetch threads first? The reason I ask is that if the slave machine on which the Executor is running has some number of cores, c, then I would have thought that some of the threads launched would occupy some number, c_1, of the cores and conduct the local reads (where c_1 = c). The other threads would occupy the other (c - c_1) cores' cycles until *all* necessary blocks have been read, and depending on c and the number of blocks to fetch so that none of the cores are idle if there are many blocks to fetch. (I monitor the CPU utilization of our nodes throughout a job, and generally find them under-utilized statistically speaking; that is, their usage over the whole job is lower than expected, with short burst of high usage, so I ask this question in a specific way for this reason, since I can see trends in the probability density functions of CPU utilization as the #partitions of our RDDs are increased). ShuffleBlockFetcherIterator.scala: private[this] def initialize(): Unit = { ... // Send out initial requests for blocks, up to our maxBytesInFlight while (fetchRequests.nonEmpty (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size = maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) } val numFetches = remoteRequests.size - fetchRequests.size logInfo(Started + numFetches + remote fetches in + Utils.getUsedTimeMs(startTime)) // Get Local Blocks fetchLocalBlocks() logDebug(Got local blocks in + Utils.getUsedTimeMs(startTime)) } private[this] def fetchLocalBlocks() { val iter = localBlocks.iterator while (iter.hasNext) { val blockId = iter.next() try { val buf = blockManager.getBlockData(blockId) shuffleMetrics.incLocalBlocksFetched(1) shuffleMetrics.incLocalBytesRead(buf.size) buf.retain() results.put(new SuccessFetchResult(blockId, 0, buf)) } catch { ... } } } Obviously, I will have to sit down with core/.../network/nio/* and core/.../shuffle/* and do my own homework on this, but from what I can tell, the BlockDataManager relies on either NioBlockTransferService.scala or the NettyBlockTransferService.scala (which are set in SparkEnv.scala), both of which do the grunt work of actually buffering and transferring the blocks' bytes. Finally, the tasks in new stage for which the shuffle outputs have been fetched will not commence until all of the block fetching threads (both local and remote) have terminated. Does the above paint an accurate picture? I would really appreciate clarification on the concurrency, since I would like to determine why our jobs have under-utilization and poor weak scaling efficiency. I will cc this thread over to the dev list. I did not cc them in case my previous question was trivial---I didn't want to spam the list unnecessarily, since I do not see these kinds of questions posed there frequently. Thanks a bunch,
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Here’s how the shuffle works. This explains what happens for a single task; this will happen in parallel for each task running on the machine, and as Imran said, Spark runs up to “numCores” tasks concurrently on each machine. There's also an answer to the original question about why CPU use is low at the very bottom. The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed. The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items). As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread. So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network. We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1]. In a little more detail, here’s exactly what happens when a task begins reading shuffled data: (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260 ). These requests are all executed asynchronously using a ShuffleClient [2] via the shuffleClient.fetchBlocks call https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149 [3]. We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue. (2) Begin processing the local data. One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230. Because we memory map the files, which is speedy, the local data typically all ends up on the results in front of the remote data. (3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [4] and eventually gets unrolled [5]. Each time next() is called on the iterator, it blocks waiting for an item from the results queue. This may return right away, or if the queue is empty, will block waiting on new data from the network [6]. Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in maxBytesInFlight outstanding, send some more requests for data. Notes: [0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue. [1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed. [1.5] Each request may include multiple shuffle blocks, where is a block is the data output for this reduce task by a particular map task. All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We do some hacks https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177 to try to size these requests in a good way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding maxBytesInFlight. 5 is completely a magic number here that was probably guessed by someone long long ago, and it seems to work ok. [2] The default configuration uses NettyBlockTransferService as the ShuffleClient implementation (note that this extends BlockTransferService, which extends ShuffleClient). [3] If you’re curious how the shuffle client fetches data, the default Spark configuration results in exactly one TCP connection from an executor to each other executor. If executor A is getting shuffle data
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Kay, Excellent write-up. This should be preserved for reference somewhere searchable. -Gerard. On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Here’s how the shuffle works. This explains what happens for a single task; this will happen in parallel for each task running on the machine, and as Imran said, Spark runs up to “numCores” tasks concurrently on each machine. There's also an answer to the original question about why CPU use is low at the very bottom. The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed. The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items). As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread. So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network. We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1]. In a little more detail, here’s exactly what happens when a task begins reading shuffled data: (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260 ). These requests are all executed asynchronously using a ShuffleClient [2] via the shuffleClient.fetchBlocks call https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149 [3]. We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue. (2) Begin processing the local data. One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230. Because we memory map the files, which is speedy, the local data typically all ends up on the results in front of the remote data. (3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [4] and eventually gets unrolled [5]. Each time next() is called on the iterator, it blocks waiting for an item from the results queue. This may return right away, or if the queue is empty, will block waiting on new data from the network [6]. Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in maxBytesInFlight outstanding, send some more requests for data. Notes: [0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue. [1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed. [1.5] Each request may include multiple shuffle blocks, where is a block is the data output for this reduce task by a particular map task. All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We do some hacks https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177 to try to size these requests in a good way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding maxBytesInFlight. 5 is completely a magic number here that was probably guessed by someone long long ago, and it seems to work ok. [2] The default configuration uses NettyBlockTransferService as the ShuffleClient implementation (note that this extends
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Good idea -- I've added this to the wiki: https://cwiki.apache.org/confluence/display/SPARK/Shuffle+Internals. Happy to stick it elsewhere if folks think there's a more convenient place. On Thu, Jun 11, 2015 at 4:46 PM, Gerard Maas gerard.m...@gmail.com wrote: Kay, Excellent write-up. This should be preserved for reference somewhere searchable. -Gerard. On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Here’s how the shuffle works. This explains what happens for a single task; this will happen in parallel for each task running on the machine, and as Imran said, Spark runs up to “numCores” tasks concurrently on each machine. There's also an answer to the original question about why CPU use is low at the very bottom. The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed. The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items). As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread. So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network. We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1]. In a little more detail, here’s exactly what happens when a task begins reading shuffled data: (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260 ). These requests are all executed asynchronously using a ShuffleClient [2] via the shuffleClient.fetchBlocks call https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149 [3]. We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue. (2) Begin processing the local data. One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230. Because we memory map the files, which is speedy, the local data typically all ends up on the results in front of the remote data. (3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [4] and eventually gets unrolled [5]. Each time next() is called on the iterator, it blocks waiting for an item from the results queue. This may return right away, or if the queue is empty, will block waiting on new data from the network [6]. Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in maxBytesInFlight outstanding, send some more requests for data. Notes: [0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue. [1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed. [1.5] Each request may include multiple shuffle blocks, where is a block is the data output for this reduce task by a particular map task. All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We do some hacks https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177 to try to size these requests in a good way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Hi Imran, Thank you for your email. In examing the condition (t2 - t1) (t_ser + t_deser + t_exec), I have found it to be true, although I have not included the t_{wait_for_read} in this, since it is---so far as I can tell---been either zero or negligible compared to the task time. Thanks, Mike On 6/8/15, Imran Rashid iras...@cloudera.com wrote: Hi Mike, all good questions, let me take a stab at answering them: 1. Event Logs + Stages: Its normal for stages to get skipped if they are shuffle map stages, which get read multiple times. Eg., here's a little example program I wrote earlier to demonstrate this: d3 doesn't need to be re-shuffled since each time its read w/ the same partitioner. So skipping stages in this way is a good thing: val partitioner = new org.apache.spark.HashPartitioner(10) val d3 = sc.parallelize(1 to 100).map { x = (x % 10) - x}.partitionBy(partitioner) (0 until 5).foreach { idx = val otherData = sc.parallelize(1 to (idx * 100)).map{ x = (x % 10) - x}.partitionBy(partitioner) println(idx + --- + otherData.join(d3).count()) } If you run this, f you look in the UI you'd see that all jobs except for the first one have one stage that is skipped. You will also see this in the log: 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12, Stage 13) 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13) Admittedly that is not very clear, but that is sort of indicating to you that the DAGScheduler first created stage 12 as a necessary step, and then later on changed its mind by realizing that everything it needed for stage 12 already existed, so there was nothing to do. 2. Extracting Event Log Information maybe you are interested in SparkListener ? Though unfortunately, I don't know of a good blog post describing it, hopefully the docs are clear ... 3. Time Metrics in Spark Event Log This is a great question. I *think* the only exception is that t_gc is really overlapped with t_exec. So I think you should really expect (t2 - t1) (t_ser + t_deser + t_exec) I am not 100% sure about this, though. I'd be curious if that was constraint was ever violated. As for your question on shuffle read vs. shuffle write time -- I wouldn't necessarily expect the same stage to have times for both shuffle read shuffle write -- in the simplest case, you'll have shuffle write times in one, and shuffle read times in the next one. But even taking that into account, there is a difference in the way they work are measured. shuffle read operations are pipelined and the way we measure shuffle read, its just how much time is spent *waiting* for network transfer. It could be that there is no (measurable) wait time b/c the next blocks are fetched before they are needed. Shuffle writes occur in the normal task execution thread, though, so we (try to) measure all of it. On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes 91m...@gmail.com wrote: Hi Patrick and Akhil, Thank you both for your responses. This is a bit of an extended email, but I'd like to: 1. Answer your (Patrick) note about the missing stages since the IDs do (briefly) appear in the event logs 2. Ask for advice/experience with extracting information from the event logs in a columnar, delimiter-separated format. 3. Ask about the time metrics reported in the event logs; currently, the elapsed time for a task does not equal the sum of the times for its components 1. Event Logs + Stages: = As I said before, In the spark logs (the log4j configurable ones from the driver), I only see references to some stages, where the stage IDs are not arithmetically increasing. In the event logs, however, I will see reference to *every* stage, although not all stages will have tasks associated with them. For instance, to examine the actual stages that have tasks, you can see missing stages: # grep -E 'Event:SparkListenerTaskEnd' app.log \ # | grep -Eo 'Stage ID:[[:digit:]]+' \ # | sort -n|uniq | head -n 5 Stage ID:0 Stage ID:1 Stage ID:10 Stage ID:11 Stage ID:110 However, these missing stages *do* appear in the event logs as Stage IDs in the jobs submitted, i.e: for # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage IDs:\[.*\]' | head -n 5 Stage IDs:[0,1,2] Stage IDs:[5,3,4] Stage IDs:[6,7,8] Stage IDs:[9,10,11] Stage IDs:[12,13,14] I do not know if this amounts to a bug, since I am not familiar with the scheduler in detail. The stages have seemingly been created somewhere in the DAG, but then have no associated tasks and never appear again. 2. Extracting Event Log Information Currently we are running scalability tests, and are finding very poor scalability for certain block matrix algorithms. I would like to have finer detail about the communication time and bandwidth when data is transferred between nodes. I
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Ahhh---forgive my typo: what I mean is, (t2 - t1) = (t_ser + t_deser + t_exec) is satisfied, empirically. On 6/10/15, Mike Hynes 91m...@gmail.com wrote: Hi Imran, Thank you for your email. In examing the condition (t2 - t1) (t_ser + t_deser + t_exec), I have found it to be true, although I have not included the t_{wait_for_read} in this, since it is---so far as I can tell---been either zero or negligible compared to the task time. Thanks, Mike On 6/8/15, Imran Rashid iras...@cloudera.com wrote: Hi Mike, all good questions, let me take a stab at answering them: 1. Event Logs + Stages: Its normal for stages to get skipped if they are shuffle map stages, which get read multiple times. Eg., here's a little example program I wrote earlier to demonstrate this: d3 doesn't need to be re-shuffled since each time its read w/ the same partitioner. So skipping stages in this way is a good thing: val partitioner = new org.apache.spark.HashPartitioner(10) val d3 = sc.parallelize(1 to 100).map { x = (x % 10) - x}.partitionBy(partitioner) (0 until 5).foreach { idx = val otherData = sc.parallelize(1 to (idx * 100)).map{ x = (x % 10) - x}.partitionBy(partitioner) println(idx + --- + otherData.join(d3).count()) } If you run this, f you look in the UI you'd see that all jobs except for the first one have one stage that is skipped. You will also see this in the log: 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12, Stage 13) 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13) Admittedly that is not very clear, but that is sort of indicating to you that the DAGScheduler first created stage 12 as a necessary step, and then later on changed its mind by realizing that everything it needed for stage 12 already existed, so there was nothing to do. 2. Extracting Event Log Information maybe you are interested in SparkListener ? Though unfortunately, I don't know of a good blog post describing it, hopefully the docs are clear ... 3. Time Metrics in Spark Event Log This is a great question. I *think* the only exception is that t_gc is really overlapped with t_exec. So I think you should really expect (t2 - t1) (t_ser + t_deser + t_exec) I am not 100% sure about this, though. I'd be curious if that was constraint was ever violated. As for your question on shuffle read vs. shuffle write time -- I wouldn't necessarily expect the same stage to have times for both shuffle read shuffle write -- in the simplest case, you'll have shuffle write times in one, and shuffle read times in the next one. But even taking that into account, there is a difference in the way they work are measured. shuffle read operations are pipelined and the way we measure shuffle read, its just how much time is spent *waiting* for network transfer. It could be that there is no (measurable) wait time b/c the next blocks are fetched before they are needed. Shuffle writes occur in the normal task execution thread, though, so we (try to) measure all of it. On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes 91m...@gmail.com wrote: Hi Patrick and Akhil, Thank you both for your responses. This is a bit of an extended email, but I'd like to: 1. Answer your (Patrick) note about the missing stages since the IDs do (briefly) appear in the event logs 2. Ask for advice/experience with extracting information from the event logs in a columnar, delimiter-separated format. 3. Ask about the time metrics reported in the event logs; currently, the elapsed time for a task does not equal the sum of the times for its components 1. Event Logs + Stages: = As I said before, In the spark logs (the log4j configurable ones from the driver), I only see references to some stages, where the stage IDs are not arithmetically increasing. In the event logs, however, I will see reference to *every* stage, although not all stages will have tasks associated with them. For instance, to examine the actual stages that have tasks, you can see missing stages: # grep -E 'Event:SparkListenerTaskEnd' app.log \ # | grep -Eo 'Stage ID:[[:digit:]]+' \ # | sort -n|uniq | head -n 5 Stage ID:0 Stage ID:1 Stage ID:10 Stage ID:11 Stage ID:110 However, these missing stages *do* appear in the event logs as Stage IDs in the jobs submitted, i.e: for # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage IDs:\[.*\]' | head -n 5 Stage IDs:[0,1,2] Stage IDs:[5,3,4] Stage IDs:[6,7,8] Stage IDs:[9,10,11] Stage IDs:[12,13,14] I do not know if this amounts to a bug, since I am not familiar with the scheduler in detail. The stages have seemingly been created somewhere in the DAG, but then have no associated tasks and never appear again. 2. Extracting Event Log Information Currently we are running scalability tests, and are finding very poor
Re: Stages with non-arithmetic numbering Timing metrics in event logs
Hi Mike, all good questions, let me take a stab at answering them: 1. Event Logs + Stages: Its normal for stages to get skipped if they are shuffle map stages, which get read multiple times. Eg., here's a little example program I wrote earlier to demonstrate this: d3 doesn't need to be re-shuffled since each time its read w/ the same partitioner. So skipping stages in this way is a good thing: val partitioner = new org.apache.spark.HashPartitioner(10) val d3 = sc.parallelize(1 to 100).map { x = (x % 10) - x}.partitionBy(partitioner) (0 until 5).foreach { idx = val otherData = sc.parallelize(1 to (idx * 100)).map{ x = (x % 10) - x}.partitionBy(partitioner) println(idx + --- + otherData.join(d3).count()) } If you run this, f you look in the UI you'd see that all jobs except for the first one have one stage that is skipped. You will also see this in the log: 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12, Stage 13) 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13) Admittedly that is not very clear, but that is sort of indicating to you that the DAGScheduler first created stage 12 as a necessary step, and then later on changed its mind by realizing that everything it needed for stage 12 already existed, so there was nothing to do. 2. Extracting Event Log Information maybe you are interested in SparkListener ? Though unfortunately, I don't know of a good blog post describing it, hopefully the docs are clear ... 3. Time Metrics in Spark Event Log This is a great question. I *think* the only exception is that t_gc is really overlapped with t_exec. So I think you should really expect (t2 - t1) (t_ser + t_deser + t_exec) I am not 100% sure about this, though. I'd be curious if that was constraint was ever violated. As for your question on shuffle read vs. shuffle write time -- I wouldn't necessarily expect the same stage to have times for both shuffle read shuffle write -- in the simplest case, you'll have shuffle write times in one, and shuffle read times in the next one. But even taking that into account, there is a difference in the way they work are measured. shuffle read operations are pipelined and the way we measure shuffle read, its just how much time is spent *waiting* for network transfer. It could be that there is no (measurable) wait time b/c the next blocks are fetched before they are needed. Shuffle writes occur in the normal task execution thread, though, so we (try to) measure all of it. On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes 91m...@gmail.com wrote: Hi Patrick and Akhil, Thank you both for your responses. This is a bit of an extended email, but I'd like to: 1. Answer your (Patrick) note about the missing stages since the IDs do (briefly) appear in the event logs 2. Ask for advice/experience with extracting information from the event logs in a columnar, delimiter-separated format. 3. Ask about the time metrics reported in the event logs; currently, the elapsed time for a task does not equal the sum of the times for its components 1. Event Logs + Stages: = As I said before, In the spark logs (the log4j configurable ones from the driver), I only see references to some stages, where the stage IDs are not arithmetically increasing. In the event logs, however, I will see reference to *every* stage, although not all stages will have tasks associated with them. For instance, to examine the actual stages that have tasks, you can see missing stages: # grep -E 'Event:SparkListenerTaskEnd' app.log \ # | grep -Eo 'Stage ID:[[:digit:]]+' \ # | sort -n|uniq | head -n 5 Stage ID:0 Stage ID:1 Stage ID:10 Stage ID:11 Stage ID:110 However, these missing stages *do* appear in the event logs as Stage IDs in the jobs submitted, i.e: for # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage IDs:\[.*\]' | head -n 5 Stage IDs:[0,1,2] Stage IDs:[5,3,4] Stage IDs:[6,7,8] Stage IDs:[9,10,11] Stage IDs:[12,13,14] I do not know if this amounts to a bug, since I am not familiar with the scheduler in detail. The stages have seemingly been created somewhere in the DAG, but then have no associated tasks and never appear again. 2. Extracting Event Log Information Currently we are running scalability tests, and are finding very poor scalability for certain block matrix algorithms. I would like to have finer detail about the communication time and bandwidth when data is transferred between nodes. I would really just like to have a file with nothing but task info in a format such as: timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ... 0010294, 1, slave-1, 503, 34, ... 0010392, 2, slave-2, 543, 32, ... and similarly for jobs/stages/rdd_memory/shuffle output/etc. I have extracted the relevant time fields from the spark event logs with a sed script, but I wonder if
Stages with non-arithmetic numbering Timing metrics in event logs
Hi Patrick and Akhil, Thank you both for your responses. This is a bit of an extended email, but I'd like to: 1. Answer your (Patrick) note about the missing stages since the IDs do (briefly) appear in the event logs 2. Ask for advice/experience with extracting information from the event logs in a columnar, delimiter-separated format. 3. Ask about the time metrics reported in the event logs; currently, the elapsed time for a task does not equal the sum of the times for its components 1. Event Logs + Stages: = As I said before, In the spark logs (the log4j configurable ones from the driver), I only see references to some stages, where the stage IDs are not arithmetically increasing. In the event logs, however, I will see reference to *every* stage, although not all stages will have tasks associated with them. For instance, to examine the actual stages that have tasks, you can see missing stages: # grep -E 'Event:SparkListenerTaskEnd' app.log \ # | grep -Eo 'Stage ID:[[:digit:]]+' \ # | sort -n|uniq | head -n 5 Stage ID:0 Stage ID:1 Stage ID:10 Stage ID:11 Stage ID:110 However, these missing stages *do* appear in the event logs as Stage IDs in the jobs submitted, i.e: for # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage IDs:\[.*\]' | head -n 5 Stage IDs:[0,1,2] Stage IDs:[5,3,4] Stage IDs:[6,7,8] Stage IDs:[9,10,11] Stage IDs:[12,13,14] I do not know if this amounts to a bug, since I am not familiar with the scheduler in detail. The stages have seemingly been created somewhere in the DAG, but then have no associated tasks and never appear again. 2. Extracting Event Log Information Currently we are running scalability tests, and are finding very poor scalability for certain block matrix algorithms. I would like to have finer detail about the communication time and bandwidth when data is transferred between nodes. I would really just like to have a file with nothing but task info in a format such as: timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ... 0010294, 1, slave-1, 503, 34, ... 0010392, 2, slave-2, 543, 32, ... and similarly for jobs/stages/rdd_memory/shuffle output/etc. I have extracted the relevant time fields from the spark event logs with a sed script, but I wonder if there is an even more expedient way. Unfortunately, I do not immediately see how to do this using the $SPARK_HOME/conf/metrics.properties file and haven't come across a blog/etc that describes this. Could anyone please comment on whether or not a metrics configuation for this already exists? 3. Time Metrics in Spark Event Log == I am confused about the times reported for tasks in the event log. There are launch and finish timestamps given for each task (call them t1 and t2, respectively), as well as GC time (t_gc), execution time (t_exec), and serialization times (t_ser, t_deser). However the times do not add up as I would have expected. I would imagine that the elapsed time t2 - t1 would be slightly larger than the sum of the component times. However, I can find many instances in the event logs where: (t2 - t1) (t_gc + t_ser + t_deser + t_exec) The difference can be 500 ms or more, which is not negligible for my current execution times of ~5000 ms. I have attached a plot that illustrates this. Regarding this, I'd like to ask: 1. How exactly are these times are being measured? 2. Should the sum of the component times equal the elapsed (clock) time for the task? 3. If not, which component(s) is(are) being excluded, and when do they occur? 4. There are occasionally reported measurements for Shuffle Write time, but not shuffle read time. Is there a method to determine the time required to shuffle data? Could this be done by look at delays between the first task in a new stage and the last task in the previous stage? Thank you very much for your time, Mike On 6/7/15, Patrick Wendell pwend...@gmail.com wrote: Hey Mike, Stage ID's are not guaranteed to be sequential because of the way the DAG scheduler works (only increasing). In some cases stage ID numbers are skipped when stages are generated. Any stage/ID that appears in the Spark UI is an actual stage, so if you see ID's in there, but they are not in the logs, then let us know (that would be a bug). - Patrick On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you seeing the same behavior on the driver UI? (that running on port 4040), If you click on the stage id header you can sort the stages based on IDs. Thanks Best Regards On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes 91m...@gmail.com wrote: Hi folks, When I look at the output logs for an iterative Spark program, I see that the stage IDs are not arithmetically numbered---that is, there are gaps between stages and I might find log information about Stage 0, 1,2, 5, but not 3 or 4. As an example, the