Hi Mike,

all good questions, let me take a stab at answering them:

1. Event Logs + Stages:

Its normal for stages to get skipped if they are shuffle map stages, which
get read multiple times.  Eg., here's a little example program I wrote
earlier to demonstrate this: "d3" doesn't need to be re-shuffled since each
time its read w/ the same partitioner.  So skipping stages in this way is a
good thing:

val partitioner = new org.apache.spark.HashPartitioner(10)
val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
x}.partitionBy(partitioner)
(0 until 5).foreach { idx =>
  val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) ->
x}.partitionBy(partitioner)
  println(idx + " ---> " + otherData.join(d3).count())
}

If you run this, f you look in the UI you'd see that all jobs except for
the first one have one stage that is skipped.  You will also see this in
the log:

15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage 12,
Stage 13)

15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13)

Admittedly that is not very clear, but that is sort of indicating to you
that the DAGScheduler first created stage 12 as a necessary step, and then
later on changed its mind by realizing that everything it needed for stage
12 already existed, so there was nothing to do.


2. Extracting Event Log Information

maybe you are interested in SparkListener ? Though unfortunately, I don't
know of a good blog post describing it, hopefully the docs are clear ...

3. Time Metrics in Spark Event Log

This is a great question.  I *think* the only exception is that t_gc is
really overlapped with t_exec.  So I think you should really expect

(t2 - t1) < (t_ser + t_deser + t_exec)

I am not 100% sure about this, though.  I'd be curious if that was
constraint was ever violated.


As for your question on shuffle read vs. shuffle write time -- I wouldn't
necessarily expect the same stage to have times for both shuffle read &
shuffle write -- in the simplest case, you'll have shuffle write times in
one, and shuffle read times in the next one.  But even taking that into
account, there is a difference in the way they work & are measured.
 shuffle read operations are pipelined and the way we measure shuffle read,
its just how much time is spent *waiting* for network transfer.  It could
be that there is no (measurable) wait time b/c the next blocks are fetched
before they are needed.  Shuffle writes occur in the normal task execution
thread, though, so we (try to) measure all of it.


On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com> wrote:

> Hi Patrick and Akhil,
>
> Thank you both for your responses. This is a bit of an extended email,
> but I'd like to:
> 1. Answer your (Patrick) note about the "missing" stages since the IDs
> do (briefly) appear in the event logs
> 2. Ask for advice/experience with extracting information from the
> event logs in a columnar, delimiter-separated format.
> 3. Ask about the time metrics reported in the event logs; currently,
> the elapsed time for a task does not equal the sum of the times for
> its components
>
> 1. Event Logs + Stages:
> =========================
>
> As I said before, In the spark logs (the log4j configurable ones from
> the driver), I only see references to some stages, where the stage IDs
> are not arithmetically increasing. In the event logs, however, I will
> see reference to *every* stage, although not all stages will have
> tasks associated with them.
>
> For instance, to examine the actual stages that have tasks, you can
> see missing stages:
> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \
> #               | grep -Eo '"Stage ID":[[:digit:]]+'  \
> #               | sort -n|uniq | head -n 5
> "Stage ID":0
> "Stage ID":1
> "Stage ID":10
> "Stage ID":11
> "Stage ID":110
>
> However, these "missing" stages *do* appear in the event logs as Stage
> IDs in the jobs submitted, i.e: for
> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo 'Stage
> IDs":\[.*\]' | head -n 5
> Stage IDs":[0,1,2]
> Stage IDs":[5,3,4]
> Stage IDs":[6,7,8]
> Stage IDs":[9,10,11]
> Stage IDs":[12,13,14]
>
> I do not know if this amounts to a bug, since I am not familiar with
> the scheduler in detail. The stages have seemingly been created
> somewhere in the DAG, but then have no associated tasks and never
> appear again.
>
> 2. Extracting Event Log Information
> ====================================
> Currently we are running scalability tests, and are finding very poor
> scalability for certain block matrix algorithms. I would like to have
> finer detail about the communication time and bandwidth when data is
> transferred between nodes.
>
> I would really just like to have a file with nothing but task info in
> a format such as:
> timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), ...
> 0010294, 1, slave-1, 503, 34, ...
> 0010392, 2, slave-2, 543, 32, ...
> and similarly for jobs/stages/rdd_memory/shuffle output/etc.
>
> I have extracted the relevant time fields from the spark event logs
> with a sed script, but I wonder if there is an even more expedient
> way. Unfortunately, I do not immediately see how to do this using the
> $SPARK_HOME/conf/metrics.properties file and haven't come across a
> blog/etc that describes this. Could anyone please comment on whether
> or not a metrics configuation for this already exists?
>
> 3. Time Metrics in Spark Event Log
> ==================================
> I am confused about the times reported for tasks in the event log.
> There are launch and finish timestamps given for each task (call them
> t1 and t2, respectively), as well as GC time (t_gc), execution time
> (t_exec), and serialization times (t_ser, t_deser). However the times
> do not add up as I would have expected. I would imagine that the
> elapsed time t2 - t1 would be slightly larger than the sum of the
> component times. However, I can find many instances in the event logs
> where:
> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec)
> The difference can be 500 ms or more, which is not negligible for my
> current execution times of ~5000 ms. I have attached a plot that
> illustrates this.
>
> Regarding this, I'd like to ask:
> 1. How exactly are these times are being measured?
> 2. Should the sum of the component times equal the elapsed (clock)
> time for the task?
> 3. If not, which component(s) is(are) being excluded, and when do they
> occur?
> 4. There are occasionally reported measurements for Shuffle Write
> time, but not shuffle read time. Is there a method to determine the
> time required to shuffle data? Could this be done by look at delays
> between the first task in a new stage and the last task in the
> previous stage?
>
> Thank you very much for your time,
> Mike
>
>
> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote:
> > Hey Mike,
> >
> > Stage ID's are not guaranteed to be sequential because of the way the
> > DAG scheduler works (only increasing). In some cases stage ID numbers
> > are skipped when stages are generated.
> >
> > Any stage/ID that appears in the Spark UI is an actual stage, so if
> > you see ID's in there, but they are not in the logs, then let us know
> > (that would be a bug).
> >
> > - Patrick
> >
> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das <ak...@sigmoidanalytics.com>
> > wrote:
> >> Are you seeing the same behavior on the driver UI? (that running on port
> >> 4040), If you click on the stage id header you can sort the stages based
> >> on
> >> IDs.
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com> wrote:
> >>>
> >>> Hi folks,
> >>>
> >>> When I look at the output logs for an iterative Spark program, I see
> >>> that the stage IDs are not arithmetically numbered---that is, there
> >>> are gaps between stages and I might find log information about Stage
> >>> 0, 1,2, 5, but not 3 or 4.
> >>>
> >>> As an example, the output from the Spark logs below shows what I mean:
> >>>
> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at blockMap.scala:1444)
> >>> finished in 7.820 s:
> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) finished
> >>> in 3.874 s:
> >>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
> >>> finished in 2.237 s:
> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) finished
> >>> in 1.749 s:
> >>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
> >>> finished in 1.082 s:
> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) finished
> >>> in 2.078 s:
> >>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
> >>> finished in 1.317 s:
> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) finished
> >>> in 1.638 s:
> >>> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
> >>> finished in 0.732 s:
> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
> >>> finished in 0.192 s:
> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
> >>> finished in 0.170 s:
> >>> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
> >>> finished in 0.270 s:
> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) finished
> >>> in 0.455 s:
> >>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
> >>> finished in 0.928 s:
> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) finished
> >>> in 0.305 s:
> >>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
> >>> finished in 0.391 s:
> >>> 30452:INFO:DAGScheduler:Stage 32 (first at
> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
> >>> 30506:INFO:DAGScheduler:Stage 36 (first at
> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
> >>>
> >>> Can anyone comment on this being normal behavior? Is it indicative of
> >>> faults causing stages to be resubmitted? I also cannot find the
> >>> missing stages in any stage's parent List(Stage x, Stage y, ...)
> >>>
> >>> Thanks,
> >>> Mike
> >>>
> >>>
> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote:
> >>> > Thanks, René. I actually added a warning to the new JDBC
> reader/writer
> >>> > interface for 1.4.0.
> >>> >
> >>> > Even with that, I think we should support throttling JDBC; otherwise
> >>> > it's
> >>> > too convenient for our users to DOS their production database
> servers!
> >>> >
> >>> >
> >>> >   /**
> >>> >    * Construct a [[DataFrame]] representing the database table
> >>> > accessible
> >>> > via JDBC URL
> >>> >    * url named table. Partitions of the table will be retrieved in
> >>> > parallel
> >>> > based on the parameters
> >>> >    * passed to this function.
> >>> >    *
> >>> > *   * Don't create too many partitions in parallel on a large
> cluster;
> >>> > otherwise Spark might crash*
> >>> > *   * your external database systems.*
> >>> >    *
> >>> >    * @param url JDBC database url of the form
> >>> > `jdbc:subprotocol:subname`
> >>> >    * @param table Name of the table in the external database.
> >>> >    * @param columnName the name of a column of integral type that
> will
> >>> > be
> >>> > used for partitioning.
> >>> >    * @param lowerBound the minimum value of `columnName` used to
> >>> > decide
> >>> > partition stride
> >>> >    * @param upperBound the maximum value of `columnName` used to
> >>> > decide
> >>> > partition stride
> >>> >    * @param numPartitions the number of partitions.  the range
> >>> > `minValue`-`maxValue` will be split
> >>> >    *                      evenly into this many partitions
> >>> >    * @param connectionProperties JDBC database connection arguments,
> a
> >>> > list
> >>> > of arbitrary string
> >>> >    *                             tag/value. Normally at least a
> "user"
> >>> > and
> >>> > "password" property
> >>> >    *                             should be included.
> >>> >    *
> >>> >    * @since 1.4.0
> >>> >    */
> >>> >
> >>> >
> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rtref...@gmail.com>
> >>> > wrote:
> >>> >
> >>> >> Hi,
> >>> >>
> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" is
> an
> >>> >> Array[String] of 32 to 48 elements).  (The code is tailored to your
> >>> >> db,
> >>> >> specifically through the where conditions, I'd have otherwise post
> >>> >> it)
> >>> >> That should be the DataFrame API, but I'm just trying to load
> >>> >> everything
> >>> >> and discard it as soon as possible :-)
> >>> >>
> >>> >> (1) Never do a silent drop of the values by default: it kills
> >>> >> confidence.
> >>> >> An option sounds reasonable.  Some sort of insight / log would be
> >>> >> great.
> >>> >> (How many columns of what type were truncated? why?)
> >>> >> Note that I could declare the field as string via JdbcDialects
> (thank
> >>> >> you
> >>> >> guys for merging that :-) ).
> >>> >> I have quite bad experiences with silent drops / truncates of
> columns
> >>> >> and
> >>> >> thus _like_ the strict way of spark. It causes trouble but noticing
> >>> >> later
> >>> >> that your data was corrupted during conversion is even worse.
> >>> >>
> >>> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
> >>> >>
> >>> >> (3) One option would be to make it safe to use, the other option
> >>> >> would
> >>> >> be
> >>> >> to document the behavior (s.th. like "WARNING: this method tries to
> >>> >> load
> >>> >> as many partitions as possible, make sure your database can handle
> >>> >> the
> >>> >> load
> >>> >> or load them in chunks and use union"). SPARK-8008
> >>> >> https://issues.apache.org/jira/browse/SPARK-8008
> >>> >>
> >>> >> Regards,
> >>> >>   Rene Treffer
> >>> >>
> >>> >
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Mike
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> >>> For additional commands, e-mail: dev-h...@spark.apache.org
> >>>
> >>
> >
>
>
> --
> Thanks,
> Mike
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

Reply via email to