Here’s how the shuffle works.  This explains what happens for a single
task; this will happen in parallel for each task running on the machine,
and as Imran said, Spark runs up to “numCores” tasks concurrently on each
machine.  There's also an answer to the original question about why CPU use
is low at the very bottom.

The key data structure used in fetching shuffle data is the “results” queue
in ShuffleBlockFetcherIterator, which buffers data that we have in
serialized (and maybe compressed) form, but haven’t yet deserialized /
processed.  The results queue is filled by many threads fetching data over
the network (the number of concurrent threads fetching data is equal to the
number of remote executors we’re currently fetching data from) [0], and is
consumed by a single thread that deserializes the data and computes some
function over it (e.g., if you’re doing rdd.count(), the thread
deserializes the data and counts the number of items).  As we fetch data
over the network, we track bytesInFlight, which is data that has been
requested (and possibly received) from a remote executor, but that hasn’t
yet been deserialized / processed by the consumer thread.  So, this
includes all of the data in the “results” queue, and possibly more data
that’s currently outstanding over the network.  We always issue as many
requests as we can, with the constraint that bytesInFlight remains less
than a specified maximum [1].

In a little more detail, here’s exactly what happens when a task begins
reading shuffled data:

(1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1]
over the network (this happens here
<https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260>
).

These requests are all executed asynchronously using a ShuffleClient [2]
via the shuffleClient.fetchBlocks call
<https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149>
 [3].  We pass in a callback that, once a block has been successfully
fetched, sticks it on the “results” queue.

(2) Begin processing the local data.  One by one, we request the local data
from the local block manager (which memory maps the file) and then stick
the result onto the results queue
<https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230>.
Because we memory map the files, which is speedy, the local data typically
all ends up on the results in front of the remote data.

(3) One the async network requests have been issued (note — issued, but not
finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and
(2) have happened), ShuffleBlockFetcherIterator returns an iterator that
gets wrapped too many times to count [4] and eventually gets unrolled [5].
Each time next() is called on the iterator, it blocks waiting for an item
from the results queue.  This may return right away, or if the queue is
empty, will block waiting on new data from the network [6].  Before
returning from next(), we update our accounting for the bytes in flight:
the chunk of data we return is no longer considered in-flight, because it’s
about to be processed, so we update the current bytesInFlight, and if it
won’t result in > maxBytesInFlight outstanding, send some more requests for
data.

————————————————

Notes:

[0] Note that these threads consume almost no CPU resources, because they
just receive data from the OS and then execute a callback that sticks the
data on the results queue.

[1] We limit the data outstanding on the network to avoid using too much
memory to hold the data we’ve fetched over the network but haven’t yet
processed.

[1.5] Each request may include multiple shuffle blocks, where is a "block"
is the data output for this reduce task by a particular map task.  All of
the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks
shuffle blocks; each reduce task reads # map tasks blocks.  We do some hacks
<https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177>
to try to size these requests in a "good" way: we limit each request to
about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
concurrently without exceeding maxBytesInFlight.  5 is completely a magic
number here that was probably guessed by someone long long ago, and it
seems to work ok.

[2] The default configuration uses NettyBlockTransferService as the
ShuffleClient implementation (note that this extends BlockTransferService,
which extends ShuffleClient).

[3] If you’re curious how the shuffle client fetches data, the default
Spark configuration results in exactly one TCP connection from an executor
to each other executor.  If executor A is getting shuffle data from
executor B, we start by sending an OpenBlocks message from A to B.  The
OpenBlocks message includes the list of blocks that A wants to fetch, and
causes the remote executor, B, to start to pull the corresponding data into
memory from disk (we typically memory map the files, so this may not
actually result in the data being read yet), and also to store some state
associated with this “stream” of data.  The remote executor, B, responds
with a stream ID that helps it to identify the connection.  Next, A
requests blocks one at a time from B using an ChunkFetchRequest message
(this happens here
<https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L101>
in
OneForOneBlockFetcher, which calls this code
<https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java#L97>
in TransportClient; currently, we have a one-to-one mapping from a chunk to
a particular block).  It’s possible that there are many sets of shuffle
data being fetched concurrently between A and B (e.g., because many tasks
are run concurrently).  These requests are serialized, so one block is sent
at a time from B, and they’re sent in the order that the requests were
issued on A.

[4] In BlockStoreShuffleFetcher, which handles failures; then in
HashShuffleReader, which helps aggregate some of the data; etc.

[5] This happens in BlockManager.putIterator, if the RDD is going to be
cached; in the function passed in to ResultTask, if this is the last stage
in a job; or via the writer.write() call in ShuffleMapTask, if this is a
stage that generates intermediate shuffle data.

[6] We time how long we spend blocking on data from the network; this is
what’s shown as “fetch wait time” in Spark’s UI.

——————————————————

To answer the original question about why CPU use is low, this means that
the main thread (that pulls data off of the results queue) is blocking on
I/O.  It could be blocking to receive data over the network (which will be
included in the fetch wait time shown in the UI, described above), or it
could be blocking to read data from the local disk (because the shuffle
data that is read locally is memory-mapped, so the thread may block on disk
if the data hasn’t been read into memory yet).  It could also be blocking
because it’s writing new shuffle output to disk (also shown in the UI, as
shuffle write time), or because it’s spilling intermediate data.
Everything except the spilling is shown in the handy-dandy new
visualization that Kousuke recently added
<https://github.com/apache/spark/commit/a5f7b3b9c7f05598a1cc8e582e5facee1029cd5e>to
the UI (available in 1.4 onwards): if you look at the stage view, and click
on “Event Timeline”, you’ll see a visualization of the tasks and how long
they spent blocked on various things (we should add the spilling to
this…there is a long-outstanding JIRA for this).

Hope this helps!

-Kay

On Thu, Jun 11, 2015 at 7:42 AM, Imran Rashid <iras...@cloudera.com> wrote:

> That is not exactly correct -- that being said I'm not 100% on these
> details either so I'd appreciate you double checking  and / or another dev
> confirming my description.
>
>
> Spark actually has more threads going then the "numCores" you specify.
> "numCores" is really used for how many threads are actively executing
> tasks.  There are more threads for doing the fetching (the details of which
> I'm not that familiar with) -- that never cuts into the number of actively
> executing tasks for "numCores".  There isn't a 1-to-1 correspondence
> between one shuffle block and one task -- a shuffle-read task most likely
> needs to fetch many shuffle blocks, with some local and some remote (in
> most cases).  So, from your lingo above, c is numCores, but c_1 is just an
> independent pool of threads.
>
> this obvious follow up question is, if you've actually more than
> "numCores" threads going at the same time, how come cpu usage is low?  You
> could still have your cpus stuck waiting on i/o, from disk or network, so
> they aren't getting fully utilized.  And the cpu can also be idle waiting
> for memory, if there are a lot of cache misses (I'm not sure how that will
> show up in cpu monitoring).  If that were the case, that could even be from
> too many threads, as a lot of time is spent context switching ... but I'm
> just guessing now.
>
> hope this helps,
> Imran
>
>
>
> On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Hi Imran,
>>
>> Thank you again for your email.
>>
>> I just want to ask one further question to clarify the implementation
>> of the shuffle block fetches. When you say that rather than sitting
>> idle, [the executor] will immediately start reading the local block, I
>> would guess that, in implementation, the executor is going to launch
>> concurrent threads to read both local and remote blocks, which it
>> seems to do in the initialize() method of
>> core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case
>> or would the Executor run all local fetch threads first?
>>
>> The reason I ask is that if the slave machine on which the Executor is
>> running has some number of cores, c, then I  would have thought that
>> some of the threads launched would occupy some number, c_1, of the
>> cores and conduct the local reads (where c_1 <= c). The other threads
>> would occupy the other (c - c_1) cores' cycles until *all* necessary
>> blocks have been read, and depending on c and the number of blocks to
>> fetch so that none of the cores are idle if there are many blocks to
>> fetch. (I monitor the CPU utilization of our nodes throughout a job,
>> and generally find them under-utilized statistically speaking; that
>> is, their usage over the whole job is lower than expected, with short
>> burst of high usage, so I ask this question in a specific way for this
>> reason, since I can see trends in the probability density functions of
>> CPU utilization as the #partitions of our RDDs are increased).
>>
>> ShuffleBlockFetcherIterator.scala:
>>
>>   private[this] def initialize(): Unit = {
>>                 ...
>>     // Send out initial requests for blocks, up to our maxBytesInFlight
>>     while (fetchRequests.nonEmpty &&
>>       (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size
>> <= maxBytesInFlight)) {
>>       sendRequest(fetchRequests.dequeue())
>>     }
>>     val numFetches = remoteRequests.size - fetchRequests.size
>>     logInfo("Started " + numFetches + " remote fetches in" +
>> Utils.getUsedTimeMs(startTime))
>>
>>     // Get Local Blocks
>>     fetchLocalBlocks()
>>     logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
>>   }
>>   private[this] def fetchLocalBlocks() {
>>     val iter = localBlocks.iterator
>>     while (iter.hasNext) {
>>       val blockId = iter.next()
>>       try {
>>         val buf = blockManager.getBlockData(blockId)
>>         shuffleMetrics.incLocalBlocksFetched(1)
>>         shuffleMetrics.incLocalBytesRead(buf.size)
>>         buf.retain()
>>         results.put(new SuccessFetchResult(blockId, 0, buf))
>>       } catch {
>>                                 ...
>>       }
>>     }
>>   }
>>
>> Obviously, I will have to sit down with core/.../network/nio/* and
>> core/.../shuffle/* and do my own homework on this, but from what I can
>> tell, the BlockDataManager relies on either
>> NioBlockTransferService.scala or the NettyBlockTransferService.scala
>> (which are set in SparkEnv.scala), both of which do the grunt work of
>> actually buffering and transferring the blocks' bytes. Finally, the
>> tasks in new stage for which the shuffle outputs have been fetched
>> will not commence until all of the block fetching threads (both local
>> and remote) have terminated.
>>
>> Does the above paint an accurate picture? I would really appreciate
>> clarification on the concurrency, since I would like to determine why
>> our jobs have under-utilization and poor weak scaling efficiency.
>>
>> I will cc this thread over to the dev list. I did not cc them in case
>> my previous question was trivial---I didn't want to spam the list
>> unnecessarily, since I do not see these kinds of questions posed there
>> frequently.
>>
>> Thanks a bunch,
>> Mike
>>
>>
>> On 6/10/15, Imran Rashid <iras...@cloudera.com> wrote:
>> > Hi Mike,
>> >
>> > no, this is a good question, I can see how my response could be
>> interpreted
>> > both ways.
>> >
>> > To be more precise:
>> > *nothing* is fetched until the shuffle-read stage starts.  So it is
>> normal
>> > to see a spike in cluster bandwidth when that stage starts.  There is a
>> > hard-boundary between stages -- that is, spark never starts any tasks in
>> > one stage until *all* tasks in the dependent stages have been completed.
>> > (There has been on-and-off discussion about relaxing this, but IMO this
>> is
>> > unlikely to change in the near future.)  So spark will wait for all of
>> the
>> > tasks in the previous shuffle-write stage to finish, and then kick off a
>> > bunch of shuffle-read tasks in the next stage, leading to the spike you
>> > see.
>> >
>> > I was referring to the way blocks are fetched within one of those
>> > shuffle-read tasks.  One of those tasks is will probably going to need a
>> > bunch of different blocks, from many executors.  But some of the blocks
>> it
>> > needs will probably exist locally.  So the task first sends out a
>> request
>> > to fetch blocks remotely (leading to the spike), but rather than sitting
>> > idle, it will immediately start reading the local blocks.  Ideally, by
>> the
>> > time its done reading the local blocks, some of the remote blocks have
>> > already been fetched, so no time is spent *waiting* for the remote
>> reads.
>> > As the remote blocks get read, spark sends out more requests, trying to
>> > balance how much data needs to be buffered vs. preventing any waiting on
>> > remote reads (which can  be controlled by
>> spark.reducer.maxSizeInFlight).
>> >
>> > Hope that clarifies things!
>> >
>> > btw, you sent this last question to just me -- I think its a good
>> question,
>> > do you mind sending it to the list?  I figured that was accidental but
>> > wanted to check.
>> >
>> > Imran
>> >
>> > On Wed, Jun 10, 2015 at 12:20 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> Hi Imran,
>> >> One additional quick question---I just want to confirm that I fully
>> >> understand your comment that "blocks are fetched before they are
>> >> needed." Typically on our system, we see spikes in cluster bandwidth
>> >> (with ganglia) at stage boundaries, so I previously assumed that all
>> >> shuffle read occurred there. Do you mean that the blocks are fetched
>> >> by the shuffle read iterator, and hence when tasks occur afterwards
>> >> the necessary blocks have already been fetched?
>> >> Thanks---I am sorry if this is an obvious question, but I'd like to
>> >> understand this as precisely as possible.
>> >> Mike
>> >>
>> >> On 6/10/15, Mike Hynes <91m...@gmail.com> wrote:
>> >> > Ahhh---forgive my typo: what I mean is,
>> >> > (t2 - t1) >= (t_ser + t_deser + t_exec)
>> >> > is satisfied, empirically.
>> >> >
>> >> > On 6/10/15, Mike Hynes <91m...@gmail.com> wrote:
>> >> >> Hi Imran,
>> >> >>
>> >> >> Thank you for your email.
>> >> >>
>> >> >> In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I
>> >> >> have found it to be true, although I have not included the
>> >> >> t_{wait_for_read} in this, since it is---so far as I can tell---been
>> >> >> either zero or negligible compared to the task time.
>> >> >>
>> >> >> Thanks,
>> >> >> Mike
>> >> >>
>> >> >> On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote:
>> >> >>> Hi Mike,
>> >> >>>
>> >> >>> all good questions, let me take a stab at answering them:
>> >> >>>
>> >> >>> 1. Event Logs + Stages:
>> >> >>>
>> >> >>> Its normal for stages to get skipped if they are shuffle map
>> stages,
>> >> >>> which
>> >> >>> get read multiple times.  Eg., here's a little example program I
>> >> >>> wrote
>> >> >>> earlier to demonstrate this: "d3" doesn't need to be re-shuffled
>> >> >>> since
>> >> >>> each
>> >> >>> time its read w/ the same partitioner.  So skipping stages in this
>> >> >>> way
>> >> >>> is
>> >> >>> a
>> >> >>> good thing:
>> >> >>>
>> >> >>> val partitioner = new org.apache.spark.HashPartitioner(10)
>> >> >>> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
>> >> >>> x}.partitionBy(partitioner)
>> >> >>> (0 until 5).foreach { idx =>
>> >> >>>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x %
>> 10)
>> >> ->
>> >> >>> x}.partitionBy(partitioner)
>> >> >>>   println(idx + " ---> " + otherData.join(d3).count())
>> >> >>> }
>> >> >>>
>> >> >>> If you run this, f you look in the UI you'd see that all jobs
>> except
>> >> for
>> >> >>> the first one have one stage that is skipped.  You will also see
>> this
>> >> in
>> >> >>> the log:
>> >> >>>
>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage:
>> >> >>> List(Stage
>> >> >>> 12,
>> >> >>> Stage 13)
>> >> >>>
>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage
>> 13)
>> >> >>>
>> >> >>> Admittedly that is not very clear, but that is sort of indicating
>> to
>> >> you
>> >> >>> that the DAGScheduler first created stage 12 as a necessary step,
>> and
>> >> >>> then
>> >> >>> later on changed its mind by realizing that everything it needed
>> for
>> >> >>> stage
>> >> >>> 12 already existed, so there was nothing to do.
>> >> >>>
>> >> >>>
>> >> >>> 2. Extracting Event Log Information
>> >> >>>
>> >> >>> maybe you are interested in SparkListener ? Though unfortunately, I
>> >> >>> don't
>> >> >>> know of a good blog post describing it, hopefully the docs are
>> clear
>> >> ...
>> >> >>>
>> >> >>> 3. Time Metrics in Spark Event Log
>> >> >>>
>> >> >>> This is a great question.  I *think* the only exception is that
>> t_gc
>> >> >>> is
>> >> >>> really overlapped with t_exec.  So I think you should really expect
>> >> >>>
>> >> >>> (t2 - t1) < (t_ser + t_deser + t_exec)
>> >> >>>
>> >> >>> I am not 100% sure about this, though.  I'd be curious if that was
>> >> >>> constraint was ever violated.
>> >> >>>
>> >> >>>
>> >> >>> As for your question on shuffle read vs. shuffle write time -- I
>> >> >>> wouldn't
>> >> >>> necessarily expect the same stage to have times for both shuffle
>> read
>> >> >>> &
>> >> >>> shuffle write -- in the simplest case, you'll have shuffle write
>> >> >>> times
>> >> >>> in
>> >> >>> one, and shuffle read times in the next one.  But even taking that
>> >> >>> into
>> >> >>> account, there is a difference in the way they work & are measured.
>> >> >>>  shuffle read operations are pipelined and the way we measure
>> shuffle
>> >> >>> read,
>> >> >>> its just how much time is spent *waiting* for network transfer.  It
>> >> >>> could
>> >> >>> be that there is no (measurable) wait time b/c the next blocks are
>> >> >>> fetched
>> >> >>> before they are needed.  Shuffle writes occur in the normal task
>> >> >>> execution
>> >> >>> thread, though, so we (try to) measure all of it.
>> >> >>>
>> >> >>>
>> >> >>> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com>
>> wrote:
>> >> >>>
>> >> >>>> Hi Patrick and Akhil,
>> >> >>>>
>> >> >>>> Thank you both for your responses. This is a bit of an extended
>> >> >>>> email,
>> >> >>>> but I'd like to:
>> >> >>>> 1. Answer your (Patrick) note about the "missing" stages since the
>> >> >>>> IDs
>> >> >>>> do (briefly) appear in the event logs
>> >> >>>> 2. Ask for advice/experience with extracting information from the
>> >> >>>> event logs in a columnar, delimiter-separated format.
>> >> >>>> 3. Ask about the time metrics reported in the event logs;
>> currently,
>> >> >>>> the elapsed time for a task does not equal the sum of the times
>> for
>> >> >>>> its components
>> >> >>>>
>> >> >>>> 1. Event Logs + Stages:
>> >> >>>> =========================
>> >> >>>>
>> >> >>>> As I said before, In the spark logs (the log4j configurable ones
>> >> >>>> from
>> >> >>>> the driver), I only see references to some stages, where the stage
>> >> >>>> IDs
>> >> >>>> are not arithmetically increasing. In the event logs, however, I
>> >> >>>> will
>> >> >>>> see reference to *every* stage, although not all stages will have
>> >> >>>> tasks associated with them.
>> >> >>>>
>> >> >>>> For instance, to examine the actual stages that have tasks, you
>> can
>> >> >>>> see missing stages:
>> >> >>>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \
>> >> >>>> #               | grep -Eo '"Stage ID":[[:digit:]]+'  \
>> >> >>>> #               | sort -n|uniq | head -n 5
>> >> >>>> "Stage ID":0
>> >> >>>> "Stage ID":1
>> >> >>>> "Stage ID":10
>> >> >>>> "Stage ID":11
>> >> >>>> "Stage ID":110
>> >> >>>>
>> >> >>>> However, these "missing" stages *do* appear in the event logs as
>> >> >>>> Stage
>> >> >>>> IDs in the jobs submitted, i.e: for
>> >> >>>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo
>> >> >>>> 'Stage
>> >> >>>> IDs":\[.*\]' | head -n 5
>> >> >>>> Stage IDs":[0,1,2]
>> >> >>>> Stage IDs":[5,3,4]
>> >> >>>> Stage IDs":[6,7,8]
>> >> >>>> Stage IDs":[9,10,11]
>> >> >>>> Stage IDs":[12,13,14]
>> >> >>>>
>> >> >>>> I do not know if this amounts to a bug, since I am not familiar
>> with
>> >> >>>> the scheduler in detail. The stages have seemingly been created
>> >> >>>> somewhere in the DAG, but then have no associated tasks and never
>> >> >>>> appear again.
>> >> >>>>
>> >> >>>> 2. Extracting Event Log Information
>> >> >>>> ====================================
>> >> >>>> Currently we are running scalability tests, and are finding very
>> >> >>>> poor
>> >> >>>> scalability for certain block matrix algorithms. I would like to
>> >> >>>> have
>> >> >>>> finer detail about the communication time and bandwidth when data
>> is
>> >> >>>> transferred between nodes.
>> >> >>>>
>> >> >>>> I would really just like to have a file with nothing but task info
>> >> >>>> in
>> >> >>>> a format such as:
>> >> >>>> timestamp (ms), task ID, hostname, execution time (ms), GC time
>> >> >>>> (ms),
>> >> >>>> ...
>> >> >>>> 0010294, 1, slave-1, 503, 34, ...
>> >> >>>> 0010392, 2, slave-2, 543, 32, ...
>> >> >>>> and similarly for jobs/stages/rdd_memory/shuffle output/etc.
>> >> >>>>
>> >> >>>> I have extracted the relevant time fields from the spark event
>> logs
>> >> >>>> with a sed script, but I wonder if there is an even more expedient
>> >> >>>> way. Unfortunately, I do not immediately see how to do this using
>> >> >>>> the
>> >> >>>> $SPARK_HOME/conf/metrics.properties file and haven't come across a
>> >> >>>> blog/etc that describes this. Could anyone please comment on
>> whether
>> >> >>>> or not a metrics configuation for this already exists?
>> >> >>>>
>> >> >>>> 3. Time Metrics in Spark Event Log
>> >> >>>> ==================================
>> >> >>>> I am confused about the times reported for tasks in the event log.
>> >> >>>> There are launch and finish timestamps given for each task (call
>> >> >>>> them
>> >> >>>> t1 and t2, respectively), as well as GC time (t_gc), execution
>> time
>> >> >>>> (t_exec), and serialization times (t_ser, t_deser). However the
>> >> >>>> times
>> >> >>>> do not add up as I would have expected. I would imagine that the
>> >> >>>> elapsed time t2 - t1 would be slightly larger than the sum of the
>> >> >>>> component times. However, I can find many instances in the event
>> >> >>>> logs
>> >> >>>> where:
>> >> >>>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec)
>> >> >>>> The difference can be 500 ms or more, which is not negligible for
>> my
>> >> >>>> current execution times of ~5000 ms. I have attached a plot that
>> >> >>>> illustrates this.
>> >> >>>>
>> >> >>>> Regarding this, I'd like to ask:
>> >> >>>> 1. How exactly are these times are being measured?
>> >> >>>> 2. Should the sum of the component times equal the elapsed (clock)
>> >> >>>> time for the task?
>> >> >>>> 3. If not, which component(s) is(are) being excluded, and when do
>> >> >>>> they
>> >> >>>> occur?
>> >> >>>> 4. There are occasionally reported measurements for Shuffle Write
>> >> >>>> time, but not shuffle read time. Is there a method to determine
>> the
>> >> >>>> time required to shuffle data? Could this be done by look at
>> delays
>> >> >>>> between the first task in a new stage and the last task in the
>> >> >>>> previous stage?
>> >> >>>>
>> >> >>>> Thank you very much for your time,
>> >> >>>> Mike
>> >> >>>>
>> >> >>>>
>> >> >>>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote:
>> >> >>>> > Hey Mike,
>> >> >>>> >
>> >> >>>> > Stage ID's are not guaranteed to be sequential because of the
>> way
>> >> the
>> >> >>>> > DAG scheduler works (only increasing). In some cases stage ID
>> >> numbers
>> >> >>>> > are skipped when stages are generated.
>> >> >>>> >
>> >> >>>> > Any stage/ID that appears in the Spark UI is an actual stage, so
>> >> >>>> > if
>> >> >>>> > you see ID's in there, but they are not in the logs, then let us
>> >> know
>> >> >>>> > (that would be a bug).
>> >> >>>> >
>> >> >>>> > - Patrick
>> >> >>>> >
>> >> >>>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das
>> >> >>>> > <ak...@sigmoidanalytics.com>
>> >> >>>> > wrote:
>> >> >>>> >> Are you seeing the same behavior on the driver UI? (that
>> running
>> >> >>>> >> on
>> >> >>>> >> port
>> >> >>>> >> 4040), If you click on the stage id header you can sort the
>> >> >>>> >> stages
>> >> >>>> >> based
>> >> >>>> >> on
>> >> >>>> >> IDs.
>> >> >>>> >>
>> >> >>>> >> Thanks
>> >> >>>> >> Best Regards
>> >> >>>> >>
>> >> >>>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com>
>> >> >>>> >> wrote:
>> >> >>>> >>>
>> >> >>>> >>> Hi folks,
>> >> >>>> >>>
>> >> >>>> >>> When I look at the output logs for an iterative Spark
>> program, I
>> >> >>>> >>> see
>> >> >>>> >>> that the stage IDs are not arithmetically numbered---that is,
>> >> there
>> >> >>>> >>> are gaps between stages and I might find log information about
>> >> >>>> >>> Stage
>> >> >>>> >>> 0, 1,2, 5, but not 3 or 4.
>> >> >>>> >>>
>> >> >>>> >>> As an example, the output from the Spark logs below shows
>> what I
>> >> >>>> >>> mean:
>> >> >>>> >>>
>> >> >>>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
>> >> >>>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at
>> >> >>>> >>> blockMap.scala:1444)
>> >> >>>> >>> finished in 7.820 s:
>> >> >>>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810)
>> >> >>>> >>> finished
>> >> >>>> >>> in 3.874 s:
>> >> >>>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
>> >> >>>> >>> finished in 2.237 s:
>> >> >>>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817)
>> >> >>>> >>> finished
>> >> >>>> >>> in 1.749 s:
>> >> >>>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
>> >> >>>> >>> finished in 1.082 s:
>> >> >>>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810)
>> >> >>>> >>> finished
>> >> >>>> >>> in 2.078 s:
>> >> >>>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
>> >> >>>> >>> finished in 1.317 s:
>> >> >>>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817)
>> >> >>>> >>> finished
>> >> >>>> >>> in 1.638 s:
>> >> >>>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at
>> blockMap.scala:1189)
>> >> >>>> >>> finished in 0.732 s:
>> >> >>>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at
>> >> >>>> >>> blockMap.scala:1302)
>> >> >>>> >>> finished in 0.192 s:
>> >> >>>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at
>> >> >>>> >>> blockMap.scala:1302)
>> >> >>>> >>> finished in 0.170 s:
>> >> >>>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at
>> blockMap.scala:1201)
>> >> >>>> >>> finished in 0.270 s:
>> >> >>>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355)
>> >> >>>> >>> finished
>> >> >>>> >>> in 0.455 s:
>> >> >>>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
>> >> >>>> >>> finished in 0.928 s:
>> >> >>>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355)
>> >> >>>> >>> finished
>> >> >>>> >>> in 0.305 s:
>> >> >>>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
>> >> >>>> >>> finished in 0.391 s:
>> >> >>>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at
>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
>> >> >>>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at
>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>> >> >>>> >>>
>> >> >>>> >>> Can anyone comment on this being normal behavior? Is it
>> >> >>>> >>> indicative
>> >> >>>> >>> of
>> >> >>>> >>> faults causing stages to be resubmitted? I also cannot find
>> the
>> >> >>>> >>> missing stages in any stage's parent List(Stage x, Stage y,
>> ...)
>> >> >>>> >>>
>> >> >>>> >>> Thanks,
>> >> >>>> >>> Mike
>> >> >>>> >>>
>> >> >>>> >>>
>> >> >>>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote:
>> >> >>>> >>> > Thanks, René. I actually added a warning to the new JDBC
>> >> >>>> reader/writer
>> >> >>>> >>> > interface for 1.4.0.
>> >> >>>> >>> >
>> >> >>>> >>> > Even with that, I think we should support throttling JDBC;
>> >> >>>> >>> > otherwise
>> >> >>>> >>> > it's
>> >> >>>> >>> > too convenient for our users to DOS their production
>> database
>> >> >>>> servers!
>> >> >>>> >>> >
>> >> >>>> >>> >
>> >> >>>> >>> >   /**
>> >> >>>> >>> >    * Construct a [[DataFrame]] representing the database
>> table
>> >> >>>> >>> > accessible
>> >> >>>> >>> > via JDBC URL
>> >> >>>> >>> >    * url named table. Partitions of the table will be
>> >> >>>> >>> > retrieved
>> >> >>>> >>> > in
>> >> >>>> >>> > parallel
>> >> >>>> >>> > based on the parameters
>> >> >>>> >>> >    * passed to this function.
>> >> >>>> >>> >    *
>> >> >>>> >>> > *   * Don't create too many partitions in parallel on a
>> large
>> >> >>>> cluster;
>> >> >>>> >>> > otherwise Spark might crash*
>> >> >>>> >>> > *   * your external database systems.*
>> >> >>>> >>> >    *
>> >> >>>> >>> >    * @param url JDBC database url of the form
>> >> >>>> >>> > `jdbc:subprotocol:subname`
>> >> >>>> >>> >    * @param table Name of the table in the external
>> database.
>> >> >>>> >>> >    * @param columnName the name of a column of integral type
>> >> that
>> >> >>>> will
>> >> >>>> >>> > be
>> >> >>>> >>> > used for partitioning.
>> >> >>>> >>> >    * @param lowerBound the minimum value of `columnName`
>> used
>> >> >>>> >>> > to
>> >> >>>> >>> > decide
>> >> >>>> >>> > partition stride
>> >> >>>> >>> >    * @param upperBound the maximum value of `columnName`
>> used
>> >> >>>> >>> > to
>> >> >>>> >>> > decide
>> >> >>>> >>> > partition stride
>> >> >>>> >>> >    * @param numPartitions the number of partitions.  the
>> range
>> >> >>>> >>> > `minValue`-`maxValue` will be split
>> >> >>>> >>> >    *                      evenly into this many partitions
>> >> >>>> >>> >    * @param connectionProperties JDBC database connection
>> >> >>>> >>> > arguments,
>> >> >>>> a
>> >> >>>> >>> > list
>> >> >>>> >>> > of arbitrary string
>> >> >>>> >>> >    *                             tag/value. Normally at
>> least
>> >> >>>> >>> > a
>> >> >>>> "user"
>> >> >>>> >>> > and
>> >> >>>> >>> > "password" property
>> >> >>>> >>> >    *                             should be included.
>> >> >>>> >>> >    *
>> >> >>>> >>> >    * @since 1.4.0
>> >> >>>> >>> >    */
>> >> >>>> >>> >
>> >> >>>> >>> >
>> >> >>>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <
>> >> rtref...@gmail.com>
>> >> >>>> >>> > wrote:
>> >> >>>> >>> >
>> >> >>>> >>> >> Hi,
>> >> >>>> >>> >>
>> >> >>>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>> >> >>>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where
>> >> >>>> >>> >> "where"
>> >> >>>> >>> >> is
>> >> >>>> an
>> >> >>>> >>> >> Array[String] of 32 to 48 elements).  (The code is tailored
>> >> >>>> >>> >> to
>> >> >>>> >>> >> your
>> >> >>>> >>> >> db,
>> >> >>>> >>> >> specifically through the where conditions, I'd have
>> otherwise
>> >> >>>> >>> >> post
>> >> >>>> >>> >> it)
>> >> >>>> >>> >> That should be the DataFrame API, but I'm just trying to
>> load
>> >> >>>> >>> >> everything
>> >> >>>> >>> >> and discard it as soon as possible :-)
>> >> >>>> >>> >>
>> >> >>>> >>> >> (1) Never do a silent drop of the values by default: it
>> kills
>> >> >>>> >>> >> confidence.
>> >> >>>> >>> >> An option sounds reasonable.  Some sort of insight / log
>> >> >>>> >>> >> would
>> >> >>>> >>> >> be
>> >> >>>> >>> >> great.
>> >> >>>> >>> >> (How many columns of what type were truncated? why?)
>> >> >>>> >>> >> Note that I could declare the field as string via
>> >> >>>> >>> >> JdbcDialects
>> >> >>>> (thank
>> >> >>>> >>> >> you
>> >> >>>> >>> >> guys for merging that :-) ).
>> >> >>>> >>> >> I have quite bad experiences with silent drops / truncates
>> of
>> >> >>>> columns
>> >> >>>> >>> >> and
>> >> >>>> >>> >> thus _like_ the strict way of spark. It causes trouble but
>> >> >>>> >>> >> noticing
>> >> >>>> >>> >> later
>> >> >>>> >>> >> that your data was corrupted during conversion is even
>> worse.
>> >> >>>> >>> >>
>> >> >>>> >>> >> (2) SPARK-8004
>> >> https://issues.apache.org/jira/browse/SPARK-8004
>> >> >>>> >>> >>
>> >> >>>> >>> >> (3) One option would be to make it safe to use, the other
>> >> option
>> >> >>>> >>> >> would
>> >> >>>> >>> >> be
>> >> >>>> >>> >> to document the behavior (s.th. like "WARNING: this method
>> >> tries
>> >> >>>> >>> >> to
>> >> >>>> >>> >> load
>> >> >>>> >>> >> as many partitions as possible, make sure your database can
>> >> >>>> >>> >> handle
>> >> >>>> >>> >> the
>> >> >>>> >>> >> load
>> >> >>>> >>> >> or load them in chunks and use union"). SPARK-8008
>> >> >>>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008
>> >> >>>> >>> >>
>> >> >>>> >>> >> Regards,
>> >> >>>> >>> >>   Rene Treffer
>> >> >>>> >>> >>
>> >> >>>> >>> >
>> >> >>>> >>>
>> >> >>>> >>>
>> >> >>>> >>> --
>> >> >>>> >>> Thanks,
>> >> >>>> >>> Mike
>> >> >>>> >>>
>> >> >>>> >>>
>> >> ---------------------------------------------------------------------
>> >> >>>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> >> >>>> >>> For additional commands, e-mail: dev-h...@spark.apache.org
>> >> >>>> >>>
>> >> >>>> >>
>> >> >>>> >
>> >> >>>>
>> >> >>>>
>> >> >>>> --
>> >> >>>> Thanks,
>> >> >>>> Mike
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> ---------------------------------------------------------------------
>> >> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> >> >>>> For additional commands, e-mail: dev-h...@spark.apache.org
>> >> >>>>
>> >> >>>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Thanks,
>> >> >> Mike
>> >> >>
>> >> >
>> >> >
>> >> > --
>> >> > Thanks,
>> >> > Mike
>> >> >
>> >>
>> >>
>> >> --
>> >> Thanks,
>> >> Mike
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>
>

Reply via email to