Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Imran Rashid
That is not exactly correct -- that being said I'm not 100% on these
details either so I'd appreciate you double checking  and / or another dev
confirming my description.


Spark actually has more threads going then the numCores you specify.
numCores is really used for how many threads are actively executing
tasks.  There are more threads for doing the fetching (the details of which
I'm not that familiar with) -- that never cuts into the number of actively
executing tasks for numCores.  There isn't a 1-to-1 correspondence
between one shuffle block and one task -- a shuffle-read task most likely
needs to fetch many shuffle blocks, with some local and some remote (in
most cases).  So, from your lingo above, c is numCores, but c_1 is just an
independent pool of threads.

this obvious follow up question is, if you've actually more than numCores
threads going at the same time, how come cpu usage is low?  You could still
have your cpus stuck waiting on i/o, from disk or network, so they aren't
getting fully utilized.  And the cpu can also be idle waiting for memory,
if there are a lot of cache misses (I'm not sure how that will show up in
cpu monitoring).  If that were the case, that could even be from too many
threads, as a lot of time is spent context switching ... but I'm just
guessing now.

hope this helps,
Imran



On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes 91m...@gmail.com wrote:

 Hi Imran,

 Thank you again for your email.

 I just want to ask one further question to clarify the implementation
 of the shuffle block fetches. When you say that rather than sitting
 idle, [the executor] will immediately start reading the local block, I
 would guess that, in implementation, the executor is going to launch
 concurrent threads to read both local and remote blocks, which it
 seems to do in the initialize() method of
 core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case
 or would the Executor run all local fetch threads first?

 The reason I ask is that if the slave machine on which the Executor is
 running has some number of cores, c, then I  would have thought that
 some of the threads launched would occupy some number, c_1, of the
 cores and conduct the local reads (where c_1 = c). The other threads
 would occupy the other (c - c_1) cores' cycles until *all* necessary
 blocks have been read, and depending on c and the number of blocks to
 fetch so that none of the cores are idle if there are many blocks to
 fetch. (I monitor the CPU utilization of our nodes throughout a job,
 and generally find them under-utilized statistically speaking; that
 is, their usage over the whole job is lower than expected, with short
 burst of high usage, so I ask this question in a specific way for this
 reason, since I can see trends in the probability density functions of
 CPU utilization as the #partitions of our RDDs are increased).

 ShuffleBlockFetcherIterator.scala:

   private[this] def initialize(): Unit = {
 ...
 // Send out initial requests for blocks, up to our maxBytesInFlight
 while (fetchRequests.nonEmpty 
   (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size
 = maxBytesInFlight)) {
   sendRequest(fetchRequests.dequeue())
 }
 val numFetches = remoteRequests.size - fetchRequests.size
 logInfo(Started  + numFetches +  remote fetches in +
 Utils.getUsedTimeMs(startTime))

 // Get Local Blocks
 fetchLocalBlocks()
 logDebug(Got local blocks in  + Utils.getUsedTimeMs(startTime))
   }
   private[this] def fetchLocalBlocks() {
 val iter = localBlocks.iterator
 while (iter.hasNext) {
   val blockId = iter.next()
   try {
 val buf = blockManager.getBlockData(blockId)
 shuffleMetrics.incLocalBlocksFetched(1)
 shuffleMetrics.incLocalBytesRead(buf.size)
 buf.retain()
 results.put(new SuccessFetchResult(blockId, 0, buf))
   } catch {
 ...
   }
 }
   }

 Obviously, I will have to sit down with core/.../network/nio/* and
 core/.../shuffle/* and do my own homework on this, but from what I can
 tell, the BlockDataManager relies on either
 NioBlockTransferService.scala or the NettyBlockTransferService.scala
 (which are set in SparkEnv.scala), both of which do the grunt work of
 actually buffering and transferring the blocks' bytes. Finally, the
 tasks in new stage for which the shuffle outputs have been fetched
 will not commence until all of the block fetching threads (both local
 and remote) have terminated.

 Does the above paint an accurate picture? I would really appreciate
 clarification on the concurrency, since I would like to determine why
 our jobs have under-utilization and poor weak scaling efficiency.

 I will cc this thread over to the dev list. I did not cc them in case
 my previous question was trivial---I didn't want to spam the list
 unnecessarily, since I do not see these kinds of questions posed there
 frequently.

 Thanks a bunch,

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Kay Ousterhout
Here’s how the shuffle works.  This explains what happens for a single
task; this will happen in parallel for each task running on the machine,
and as Imran said, Spark runs up to “numCores” tasks concurrently on each
machine.  There's also an answer to the original question about why CPU use
is low at the very bottom.

The key data structure used in fetching shuffle data is the “results” queue
in ShuffleBlockFetcherIterator, which buffers data that we have in
serialized (and maybe compressed) form, but haven’t yet deserialized /
processed.  The results queue is filled by many threads fetching data over
the network (the number of concurrent threads fetching data is equal to the
number of remote executors we’re currently fetching data from) [0], and is
consumed by a single thread that deserializes the data and computes some
function over it (e.g., if you’re doing rdd.count(), the thread
deserializes the data and counts the number of items).  As we fetch data
over the network, we track bytesInFlight, which is data that has been
requested (and possibly received) from a remote executor, but that hasn’t
yet been deserialized / processed by the consumer thread.  So, this
includes all of the data in the “results” queue, and possibly more data
that’s currently outstanding over the network.  We always issue as many
requests as we can, with the constraint that bytesInFlight remains less
than a specified maximum [1].

In a little more detail, here’s exactly what happens when a task begins
reading shuffled data:

(1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1]
over the network (this happens here
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260
).

These requests are all executed asynchronously using a ShuffleClient [2]
via the shuffleClient.fetchBlocks call
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149
 [3].  We pass in a callback that, once a block has been successfully
fetched, sticks it on the “results” queue.

(2) Begin processing the local data.  One by one, we request the local data
from the local block manager (which memory maps the file) and then stick
the result onto the results queue
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230.
Because we memory map the files, which is speedy, the local data typically
all ends up on the results in front of the remote data.

(3) One the async network requests have been issued (note — issued, but not
finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and
(2) have happened), ShuffleBlockFetcherIterator returns an iterator that
gets wrapped too many times to count [4] and eventually gets unrolled [5].
Each time next() is called on the iterator, it blocks waiting for an item
from the results queue.  This may return right away, or if the queue is
empty, will block waiting on new data from the network [6].  Before
returning from next(), we update our accounting for the bytes in flight:
the chunk of data we return is no longer considered in-flight, because it’s
about to be processed, so we update the current bytesInFlight, and if it
won’t result in  maxBytesInFlight outstanding, send some more requests for
data.



Notes:

[0] Note that these threads consume almost no CPU resources, because they
just receive data from the OS and then execute a callback that sticks the
data on the results queue.

[1] We limit the data outstanding on the network to avoid using too much
memory to hold the data we’ve fetched over the network but haven’t yet
processed.

[1.5] Each request may include multiple shuffle blocks, where is a block
is the data output for this reduce task by a particular map task.  All of
the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks
shuffle blocks; each reduce task reads # map tasks blocks.  We do some hacks
https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177
to try to size these requests in a good way: we limit each request to
about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
concurrently without exceeding maxBytesInFlight.  5 is completely a magic
number here that was probably guessed by someone long long ago, and it
seems to work ok.

[2] The default configuration uses NettyBlockTransferService as the
ShuffleClient implementation (note that this extends BlockTransferService,
which extends ShuffleClient).

[3] If you’re curious how the shuffle client fetches data, the default
Spark configuration results in exactly one TCP connection from an executor
to each other executor.  If executor A is getting shuffle data 

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Gerard Maas
Kay,

Excellent write-up. This should be preserved for reference somewhere
searchable.

-Gerard.



On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu
wrote:

 Here’s how the shuffle works.  This explains what happens for a single
 task; this will happen in parallel for each task running on the machine,
 and as Imran said, Spark runs up to “numCores” tasks concurrently on each
 machine.  There's also an answer to the original question about why CPU use
 is low at the very bottom.

 The key data structure used in fetching shuffle data is the “results”
 queue in ShuffleBlockFetcherIterator, which buffers data that we have in
 serialized (and maybe compressed) form, but haven’t yet deserialized /
 processed.  The results queue is filled by many threads fetching data over
 the network (the number of concurrent threads fetching data is equal to the
 number of remote executors we’re currently fetching data from) [0], and is
 consumed by a single thread that deserializes the data and computes some
 function over it (e.g., if you’re doing rdd.count(), the thread
 deserializes the data and counts the number of items).  As we fetch data
 over the network, we track bytesInFlight, which is data that has been
 requested (and possibly received) from a remote executor, but that hasn’t
 yet been deserialized / processed by the consumer thread.  So, this
 includes all of the data in the “results” queue, and possibly more data
 that’s currently outstanding over the network.  We always issue as many
 requests as we can, with the constraint that bytesInFlight remains less
 than a specified maximum [1].

 In a little more detail, here’s exactly what happens when a task begins
 reading shuffled data:

 (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1]
 over the network (this happens here
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260
 ).

 These requests are all executed asynchronously using a ShuffleClient [2]
 via the shuffleClient.fetchBlocks call
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149
  [3].  We pass in a callback that, once a block has been successfully
 fetched, sticks it on the “results” queue.

 (2) Begin processing the local data.  One by one, we request the local
 data from the local block manager (which memory maps the file) and then stick
 the result onto the results queue
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230.
 Because we memory map the files, which is speedy, the local data typically
 all ends up on the results in front of the remote data.

 (3) One the async network requests have been issued (note — issued, but
 not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1)
 and (2) have happened), ShuffleBlockFetcherIterator returns an iterator
 that gets wrapped too many times to count [4] and eventually gets unrolled
 [5].  Each time next() is called on the iterator, it blocks waiting for an
 item from the results queue.  This may return right away, or if the queue
 is empty, will block waiting on new data from the network [6].  Before
 returning from next(), we update our accounting for the bytes in flight:
 the chunk of data we return is no longer considered in-flight, because it’s
 about to be processed, so we update the current bytesInFlight, and if it
 won’t result in  maxBytesInFlight outstanding, send some more requests for
 data.

 

 Notes:

 [0] Note that these threads consume almost no CPU resources, because they
 just receive data from the OS and then execute a callback that sticks the
 data on the results queue.

 [1] We limit the data outstanding on the network to avoid using too much
 memory to hold the data we’ve fetched over the network but haven’t yet
 processed.

 [1.5] Each request may include multiple shuffle blocks, where is a block
 is the data output for this reduce task by a particular map task.  All of
 the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks
 shuffle blocks; each reduce task reads # map tasks blocks.  We do some
 hacks
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177
 to try to size these requests in a good way: we limit each request to
 about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
 concurrently without exceeding maxBytesInFlight.  5 is completely a magic
 number here that was probably guessed by someone long long ago, and it
 seems to work ok.

 [2] The default configuration uses NettyBlockTransferService as the
 ShuffleClient implementation (note that this extends 

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-11 Thread Kay Ousterhout
Good idea -- I've added this to the wiki:
https://cwiki.apache.org/confluence/display/SPARK/Shuffle+Internals.  Happy
to stick it elsewhere if folks think there's a more convenient place.

On Thu, Jun 11, 2015 at 4:46 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Kay,

 Excellent write-up. This should be preserved for reference somewhere
 searchable.

 -Gerard.



 On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout k...@eecs.berkeley.edu
 wrote:

 Here’s how the shuffle works.  This explains what happens for a single
 task; this will happen in parallel for each task running on the machine,
 and as Imran said, Spark runs up to “numCores” tasks concurrently on each
 machine.  There's also an answer to the original question about why CPU use
 is low at the very bottom.

 The key data structure used in fetching shuffle data is the “results”
 queue in ShuffleBlockFetcherIterator, which buffers data that we have in
 serialized (and maybe compressed) form, but haven’t yet deserialized /
 processed.  The results queue is filled by many threads fetching data over
 the network (the number of concurrent threads fetching data is equal to the
 number of remote executors we’re currently fetching data from) [0], and is
 consumed by a single thread that deserializes the data and computes some
 function over it (e.g., if you’re doing rdd.count(), the thread
 deserializes the data and counts the number of items).  As we fetch data
 over the network, we track bytesInFlight, which is data that has been
 requested (and possibly received) from a remote executor, but that hasn’t
 yet been deserialized / processed by the consumer thread.  So, this
 includes all of the data in the “results” queue, and possibly more data
 that’s currently outstanding over the network.  We always issue as many
 requests as we can, with the constraint that bytesInFlight remains less
 than a specified maximum [1].

 In a little more detail, here’s exactly what happens when a task begins
 reading shuffled data:

 (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data
 [1] over the network (this happens here
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260
 ).

 These requests are all executed asynchronously using a ShuffleClient [2]
 via the shuffleClient.fetchBlocks call
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149
  [3].  We pass in a callback that, once a block has been successfully
 fetched, sticks it on the “results” queue.

 (2) Begin processing the local data.  One by one, we request the local
 data from the local block manager (which memory maps the file) and then stick
 the result onto the results queue
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230.
 Because we memory map the files, which is speedy, the local data typically
 all ends up on the results in front of the remote data.

 (3) One the async network requests have been issued (note — issued, but
 not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1)
 and (2) have happened), ShuffleBlockFetcherIterator returns an iterator
 that gets wrapped too many times to count [4] and eventually gets unrolled
 [5].  Each time next() is called on the iterator, it blocks waiting for an
 item from the results queue.  This may return right away, or if the queue
 is empty, will block waiting on new data from the network [6].  Before
 returning from next(), we update our accounting for the bytes in flight:
 the chunk of data we return is no longer considered in-flight, because it’s
 about to be processed, so we update the current bytesInFlight, and if it
 won’t result in  maxBytesInFlight outstanding, send some more requests for
 data.

 

 Notes:

 [0] Note that these threads consume almost no CPU resources, because they
 just receive data from the OS and then execute a callback that sticks the
 data on the results queue.

 [1] We limit the data outstanding on the network to avoid using too much
 memory to hold the data we’ve fetched over the network but haven’t yet
 processed.

 [1.5] Each request may include multiple shuffle blocks, where is a
 block is the data output for this reduce task by a particular map task.
 All of the reduce tasks for a shuffle read a total of # map tasks * #
 reduce tasks shuffle blocks; each reduce task reads # map tasks blocks.  We
 do some hacks
 https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177
 to try to size these requests in a good way: we limit each request to
 about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
 concurrently without exceeding 

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-09 Thread Mike Hynes
Hi Imran,

Thank you for your email.

In examing the condition (t2 - t1)  (t_ser + t_deser + t_exec), I
have found it to be true, although I have not included the
t_{wait_for_read} in this, since it is---so far as I can tell---been
either zero or negligible compared to the task time.

Thanks,
Mike

On 6/8/15, Imran Rashid iras...@cloudera.com wrote:
 Hi Mike,

 all good questions, let me take a stab at answering them:

 1. Event Logs + Stages:

 Its normal for stages to get skipped if they are shuffle map stages, which
 get read multiple times.  Eg., here's a little example program I wrote
 earlier to demonstrate this: d3 doesn't need to be re-shuffled since each
 time its read w/ the same partitioner.  So skipping stages in this way is a
 good thing:

 val partitioner = new org.apache.spark.HashPartitioner(10)
 val d3 = sc.parallelize(1 to 100).map { x = (x % 10) -
 x}.partitionBy(partitioner)
 (0 until 5).foreach { idx =
   val otherData = sc.parallelize(1 to (idx * 100)).map{ x = (x % 10) -
 x}.partitionBy(partitioner)
   println(idx +  ---  + otherData.join(d3).count())
 }

 If you run this, f you look in the UI you'd see that all jobs except for
 the first one have one stage that is skipped.  You will also see this in
 the log:

 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12,
 Stage 13)

 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13)

 Admittedly that is not very clear, but that is sort of indicating to you
 that the DAGScheduler first created stage 12 as a necessary step, and then
 later on changed its mind by realizing that everything it needed for stage
 12 already existed, so there was nothing to do.


 2. Extracting Event Log Information

 maybe you are interested in SparkListener ? Though unfortunately, I don't
 know of a good blog post describing it, hopefully the docs are clear ...

 3. Time Metrics in Spark Event Log

 This is a great question.  I *think* the only exception is that t_gc is
 really overlapped with t_exec.  So I think you should really expect

 (t2 - t1)  (t_ser + t_deser + t_exec)

 I am not 100% sure about this, though.  I'd be curious if that was
 constraint was ever violated.


 As for your question on shuffle read vs. shuffle write time -- I wouldn't
 necessarily expect the same stage to have times for both shuffle read 
 shuffle write -- in the simplest case, you'll have shuffle write times in
 one, and shuffle read times in the next one.  But even taking that into
 account, there is a difference in the way they work  are measured.
  shuffle read operations are pipelined and the way we measure shuffle read,
 its just how much time is spent *waiting* for network transfer.  It could
 be that there is no (measurable) wait time b/c the next blocks are fetched
 before they are needed.  Shuffle writes occur in the normal task execution
 thread, though, so we (try to) measure all of it.


 On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes 91m...@gmail.com wrote:

 Hi Patrick and Akhil,

 Thank you both for your responses. This is a bit of an extended email,
 but I'd like to:
 1. Answer your (Patrick) note about the missing stages since the IDs
 do (briefly) appear in the event logs
 2. Ask for advice/experience with extracting information from the
 event logs in a columnar, delimiter-separated format.
 3. Ask about the time metrics reported in the event logs; currently,
 the elapsed time for a task does not equal the sum of the times for
 its components

 1. Event Logs + Stages:
 =

 As I said before, In the spark logs (the log4j configurable ones from
 the driver), I only see references to some stages, where the stage IDs
 are not arithmetically increasing. In the event logs, however, I will
 see reference to *every* stage, although not all stages will have
 tasks associated with them.

 For instance, to examine the actual stages that have tasks, you can
 see missing stages:
 # grep -E 'Event:SparkListenerTaskEnd' app.log \
 #   | grep -Eo 'Stage ID:[[:digit:]]+'  \
 #   | sort -n|uniq | head -n 5
 Stage ID:0
 Stage ID:1
 Stage ID:10
 Stage ID:11
 Stage ID:110

 However, these missing stages *do* appear in the event logs as Stage
 IDs in the jobs submitted, i.e: for
 # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage
 IDs:\[.*\]' | head -n 5
 Stage IDs:[0,1,2]
 Stage IDs:[5,3,4]
 Stage IDs:[6,7,8]
 Stage IDs:[9,10,11]
 Stage IDs:[12,13,14]

 I do not know if this amounts to a bug, since I am not familiar with
 the scheduler in detail. The stages have seemingly been created
 somewhere in the DAG, but then have no associated tasks and never
 appear again.

 2. Extracting Event Log Information
 
 Currently we are running scalability tests, and are finding very poor
 scalability for certain block matrix algorithms. I would like to have
 finer detail about the communication time and bandwidth when data is
 transferred between nodes.

 I 

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-09 Thread Mike Hynes
Ahhh---forgive my typo: what I mean is,
(t2 - t1) = (t_ser + t_deser + t_exec)
is satisfied, empirically.

On 6/10/15, Mike Hynes 91m...@gmail.com wrote:
 Hi Imran,

 Thank you for your email.

 In examing the condition (t2 - t1)  (t_ser + t_deser + t_exec), I
 have found it to be true, although I have not included the
 t_{wait_for_read} in this, since it is---so far as I can tell---been
 either zero or negligible compared to the task time.

 Thanks,
 Mike

 On 6/8/15, Imran Rashid iras...@cloudera.com wrote:
 Hi Mike,

 all good questions, let me take a stab at answering them:

 1. Event Logs + Stages:

 Its normal for stages to get skipped if they are shuffle map stages,
 which
 get read multiple times.  Eg., here's a little example program I wrote
 earlier to demonstrate this: d3 doesn't need to be re-shuffled since
 each
 time its read w/ the same partitioner.  So skipping stages in this way is
 a
 good thing:

 val partitioner = new org.apache.spark.HashPartitioner(10)
 val d3 = sc.parallelize(1 to 100).map { x = (x % 10) -
 x}.partitionBy(partitioner)
 (0 until 5).foreach { idx =
   val otherData = sc.parallelize(1 to (idx * 100)).map{ x = (x % 10) -
 x}.partitionBy(partitioner)
   println(idx +  ---  + otherData.join(d3).count())
 }

 If you run this, f you look in the UI you'd see that all jobs except for
 the first one have one stage that is skipped.  You will also see this in
 the log:

 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage
 12,
 Stage 13)

 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13)

 Admittedly that is not very clear, but that is sort of indicating to you
 that the DAGScheduler first created stage 12 as a necessary step, and
 then
 later on changed its mind by realizing that everything it needed for
 stage
 12 already existed, so there was nothing to do.


 2. Extracting Event Log Information

 maybe you are interested in SparkListener ? Though unfortunately, I don't
 know of a good blog post describing it, hopefully the docs are clear ...

 3. Time Metrics in Spark Event Log

 This is a great question.  I *think* the only exception is that t_gc is
 really overlapped with t_exec.  So I think you should really expect

 (t2 - t1)  (t_ser + t_deser + t_exec)

 I am not 100% sure about this, though.  I'd be curious if that was
 constraint was ever violated.


 As for your question on shuffle read vs. shuffle write time -- I wouldn't
 necessarily expect the same stage to have times for both shuffle read 
 shuffle write -- in the simplest case, you'll have shuffle write times in
 one, and shuffle read times in the next one.  But even taking that into
 account, there is a difference in the way they work  are measured.
  shuffle read operations are pipelined and the way we measure shuffle
 read,
 its just how much time is spent *waiting* for network transfer.  It could
 be that there is no (measurable) wait time b/c the next blocks are
 fetched
 before they are needed.  Shuffle writes occur in the normal task
 execution
 thread, though, so we (try to) measure all of it.


 On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes 91m...@gmail.com wrote:

 Hi Patrick and Akhil,

 Thank you both for your responses. This is a bit of an extended email,
 but I'd like to:
 1. Answer your (Patrick) note about the missing stages since the IDs
 do (briefly) appear in the event logs
 2. Ask for advice/experience with extracting information from the
 event logs in a columnar, delimiter-separated format.
 3. Ask about the time metrics reported in the event logs; currently,
 the elapsed time for a task does not equal the sum of the times for
 its components

 1. Event Logs + Stages:
 =

 As I said before, In the spark logs (the log4j configurable ones from
 the driver), I only see references to some stages, where the stage IDs
 are not arithmetically increasing. In the event logs, however, I will
 see reference to *every* stage, although not all stages will have
 tasks associated with them.

 For instance, to examine the actual stages that have tasks, you can
 see missing stages:
 # grep -E 'Event:SparkListenerTaskEnd' app.log \
 #   | grep -Eo 'Stage ID:[[:digit:]]+'  \
 #   | sort -n|uniq | head -n 5
 Stage ID:0
 Stage ID:1
 Stage ID:10
 Stage ID:11
 Stage ID:110

 However, these missing stages *do* appear in the event logs as Stage
 IDs in the jobs submitted, i.e: for
 # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage
 IDs:\[.*\]' | head -n 5
 Stage IDs:[0,1,2]
 Stage IDs:[5,3,4]
 Stage IDs:[6,7,8]
 Stage IDs:[9,10,11]
 Stage IDs:[12,13,14]

 I do not know if this amounts to a bug, since I am not familiar with
 the scheduler in detail. The stages have seemingly been created
 somewhere in the DAG, but then have no associated tasks and never
 appear again.

 2. Extracting Event Log Information
 
 Currently we are running scalability tests, and are finding very poor
 

Re: Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-08 Thread Imran Rashid
Hi Mike,

all good questions, let me take a stab at answering them:

1. Event Logs + Stages:

Its normal for stages to get skipped if they are shuffle map stages, which
get read multiple times.  Eg., here's a little example program I wrote
earlier to demonstrate this: d3 doesn't need to be re-shuffled since each
time its read w/ the same partitioner.  So skipping stages in this way is a
good thing:

val partitioner = new org.apache.spark.HashPartitioner(10)
val d3 = sc.parallelize(1 to 100).map { x = (x % 10) -
x}.partitionBy(partitioner)
(0 until 5).foreach { idx =
  val otherData = sc.parallelize(1 to (idx * 100)).map{ x = (x % 10) -
x}.partitionBy(partitioner)
  println(idx +  ---  + otherData.join(d3).count())
}

If you run this, f you look in the UI you'd see that all jobs except for
the first one have one stage that is skipped.  You will also see this in
the log:

15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12,
Stage 13)

15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13)

Admittedly that is not very clear, but that is sort of indicating to you
that the DAGScheduler first created stage 12 as a necessary step, and then
later on changed its mind by realizing that everything it needed for stage
12 already existed, so there was nothing to do.


2. Extracting Event Log Information

maybe you are interested in SparkListener ? Though unfortunately, I don't
know of a good blog post describing it, hopefully the docs are clear ...

3. Time Metrics in Spark Event Log

This is a great question.  I *think* the only exception is that t_gc is
really overlapped with t_exec.  So I think you should really expect

(t2 - t1)  (t_ser + t_deser + t_exec)

I am not 100% sure about this, though.  I'd be curious if that was
constraint was ever violated.


As for your question on shuffle read vs. shuffle write time -- I wouldn't
necessarily expect the same stage to have times for both shuffle read 
shuffle write -- in the simplest case, you'll have shuffle write times in
one, and shuffle read times in the next one.  But even taking that into
account, there is a difference in the way they work  are measured.
 shuffle read operations are pipelined and the way we measure shuffle read,
its just how much time is spent *waiting* for network transfer.  It could
be that there is no (measurable) wait time b/c the next blocks are fetched
before they are needed.  Shuffle writes occur in the normal task execution
thread, though, so we (try to) measure all of it.


On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes 91m...@gmail.com wrote:

 Hi Patrick and Akhil,

 Thank you both for your responses. This is a bit of an extended email,
 but I'd like to:
 1. Answer your (Patrick) note about the missing stages since the IDs
 do (briefly) appear in the event logs
 2. Ask for advice/experience with extracting information from the
 event logs in a columnar, delimiter-separated format.
 3. Ask about the time metrics reported in the event logs; currently,
 the elapsed time for a task does not equal the sum of the times for
 its components

 1. Event Logs + Stages:
 =

 As I said before, In the spark logs (the log4j configurable ones from
 the driver), I only see references to some stages, where the stage IDs
 are not arithmetically increasing. In the event logs, however, I will
 see reference to *every* stage, although not all stages will have
 tasks associated with them.

 For instance, to examine the actual stages that have tasks, you can
 see missing stages:
 # grep -E 'Event:SparkListenerTaskEnd' app.log \
 #   | grep -Eo 'Stage ID:[[:digit:]]+'  \
 #   | sort -n|uniq | head -n 5
 Stage ID:0
 Stage ID:1
 Stage ID:10
 Stage ID:11
 Stage ID:110

 However, these missing stages *do* appear in the event logs as Stage
 IDs in the jobs submitted, i.e: for
 # grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage
 IDs:\[.*\]' | head -n 5
 Stage IDs:[0,1,2]
 Stage IDs:[5,3,4]
 Stage IDs:[6,7,8]
 Stage IDs:[9,10,11]
 Stage IDs:[12,13,14]

 I do not know if this amounts to a bug, since I am not familiar with
 the scheduler in detail. The stages have seemingly been created
 somewhere in the DAG, but then have no associated tasks and never
 appear again.

 2. Extracting Event Log Information
 
 Currently we are running scalability tests, and are finding very poor
 scalability for certain block matrix algorithms. I would like to have
 finer detail about the communication time and bandwidth when data is
 transferred between nodes.

 I would really just like to have a file with nothing but task info in
 a format such as:
 timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ...
 0010294, 1, slave-1, 503, 34, ...
 0010392, 2, slave-2, 543, 32, ...
 and similarly for jobs/stages/rdd_memory/shuffle output/etc.

 I have extracted the relevant time fields from the spark event logs
 with a sed script, but I wonder if 

Stages with non-arithmetic numbering Timing metrics in event logs

2015-06-07 Thread Mike Hynes
Hi Patrick and Akhil,

Thank you both for your responses. This is a bit of an extended email,
but I'd like to:
1. Answer your (Patrick) note about the missing stages since the IDs
do (briefly) appear in the event logs
2. Ask for advice/experience with extracting information from the
event logs in a columnar, delimiter-separated format.
3. Ask about the time metrics reported in the event logs; currently,
the elapsed time for a task does not equal the sum of the times for
its components

1. Event Logs + Stages:
=

As I said before, In the spark logs (the log4j configurable ones from
the driver), I only see references to some stages, where the stage IDs
are not arithmetically increasing. In the event logs, however, I will
see reference to *every* stage, although not all stages will have
tasks associated with them.

For instance, to examine the actual stages that have tasks, you can
see missing stages:
# grep -E 'Event:SparkListenerTaskEnd' app.log \
#   | grep -Eo 'Stage ID:[[:digit:]]+'  \
#   | sort -n|uniq | head -n 5
Stage ID:0
Stage ID:1
Stage ID:10
Stage ID:11
Stage ID:110

However, these missing stages *do* appear in the event logs as Stage
IDs in the jobs submitted, i.e: for
# grep -E 'Event:SparkListenerJobStart' app.log | grep -Eo 'Stage
IDs:\[.*\]' | head -n 5
Stage IDs:[0,1,2]
Stage IDs:[5,3,4]
Stage IDs:[6,7,8]
Stage IDs:[9,10,11]
Stage IDs:[12,13,14]

I do not know if this amounts to a bug, since I am not familiar with
the scheduler in detail. The stages have seemingly been created
somewhere in the DAG, but then have no associated tasks and never
appear again.

2. Extracting Event Log Information

Currently we are running scalability tests, and are finding very poor
scalability for certain block matrix algorithms. I would like to have
finer detail about the communication time and bandwidth when data is
transferred between nodes.

I would really just like to have a file with nothing but task info in
a format such as:
timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ...
0010294, 1, slave-1, 503, 34, ...
0010392, 2, slave-2, 543, 32, ...
and similarly for jobs/stages/rdd_memory/shuffle output/etc.

I have extracted the relevant time fields from the spark event logs
with a sed script, but I wonder if there is an even more expedient
way. Unfortunately, I do not immediately see how to do this using the
$SPARK_HOME/conf/metrics.properties file and haven't come across a
blog/etc that describes this. Could anyone please comment on whether
or not a metrics configuation for this already exists?

3. Time Metrics in Spark Event Log
==
I am confused about the times reported for tasks in the event log.
There are launch and finish timestamps given for each task (call them
t1 and t2, respectively), as well as GC time (t_gc), execution time
(t_exec), and serialization times (t_ser, t_deser). However the times
do not add up as I would have expected. I would imagine that the
elapsed time t2 - t1 would be slightly larger than the sum of the
component times. However, I can find many instances in the event logs
where:
(t2 - t1)  (t_gc + t_ser + t_deser + t_exec)
The difference can be 500 ms or more, which is not negligible for my
current execution times of ~5000 ms. I have attached a plot that
illustrates this.

Regarding this, I'd like to ask:
1. How exactly are these times are being measured?
2. Should the sum of the component times equal the elapsed (clock)
time for the task?
3. If not, which component(s) is(are) being excluded, and when do they occur?
4. There are occasionally reported measurements for Shuffle Write
time, but not shuffle read time. Is there a method to determine the
time required to shuffle data? Could this be done by look at delays
between the first task in a new stage and the last task in the
previous stage?

Thank you very much for your time,
Mike


On 6/7/15, Patrick Wendell pwend...@gmail.com wrote:
 Hey Mike,

 Stage ID's are not guaranteed to be sequential because of the way the
 DAG scheduler works (only increasing). In some cases stage ID numbers
 are skipped when stages are generated.

 Any stage/ID that appears in the Spark UI is an actual stage, so if
 you see ID's in there, but they are not in the logs, then let us know
 (that would be a bug).

 - Patrick

 On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
 Are you seeing the same behavior on the driver UI? (that running on port
 4040), If you click on the stage id header you can sort the stages based
 on
 IDs.

 Thanks
 Best Regards

 On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes 91m...@gmail.com wrote:

 Hi folks,

 When I look at the output logs for an iterative Spark program, I see
 that the stage IDs are not arithmetically numbered---that is, there
 are gaps between stages and I might find log information about Stage
 0, 1,2, 5, but not 3 or 4.

 As an example, the