Hi Patrick.



That looks very useful. The thing that seems to be missing from Shivaram's 
example is the ability to access TaskMetrics statically (this is the same 
problem that I am trying to solve with dynamic variables).






You mention defining an accumulator on the RDD. Perhaps I am missing something 
here, but my understanding was that accumulators are defined in SparkContext 
and are not part of the RDD. Is that correct?




Neil


On Tue, Jul 22, 2014 at 22:21, Patrick Wendell 
<pwend...@gmail.com="mailto:pwend...@gmail.com";>> wrote:
Shivaram,


You should take a look at this patch which adds support for naming

accumulators - this is likely to get merged in soon. I actually

started this patch by supporting named TaskMetrics similar to what you

have there, but then I realized there is too much semantic overlap

with accumulators, so I just went that route.


For instance, it would be nice if any user-defined metrics are

accessible at the driver program.


https://github.com/apache/spark/pull/1309


In your example, you could just define an accumulator here on the RDD

and you'd see the incremental update in the web UI automatically.


- Patrick


On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman

<shiva...@eecs.berkeley.edu> wrote:

> From reading Neil's first e-mail, I think the motivation is to get some

> metrics in ADAM ? --  I've run into a similar use-case with having

> user-defined metrics in long-running tasks and I think a nice way to solve

> this would be to have user-defined TaskMetrics.

>

> To state my problem more clearly, lets say you have two functions you use

> in a map call and want to measure how much time each of them takes. For

> example, if you have a code block like the one below and you want to

> measure how much time f1 takes as a fraction of the task.

>

> a.map { l =>

>    val f = f1(l)

>    ... some work here ...

> }

>

> It would be really cool if we could do something like

>

> a.map { l =>

>    val start = System.nanoTime

>    val f = f1(l)

>    TaskMetrics.get("f1-time").add(System.nanoTime - start)

> }

>

> These task metrics have a different purpose from Accumulators in the sense

> that we don't need to track lineage, perform commutative operations etc.

>  Further we also have a bunch of code in place to aggregate task metrics

> across a stage etc. So it would be great if we could also populate these in

> the UI and show median/max etc.

> I think counters [1] in Hadoop served a similar purpose.

>

> Thanks

> Shivaram

>

> [1]

> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters

>

>

>

> On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nfergu...@gmail.com> wrote:

>

>> Hi Reynold

>>

>> Thanks for your reply.

>>

>> Accumulators are, of course, stored in the Accumulators object as

>> thread-local variables. However, the Accumulators object isn't public, so

>> when a Task is executing there's no way to get the set of accumulators for

>> the current thread -- accumulators still have to be passed to every method

>> that needs them.

>>

>> Additionally, unless an accumulator is explicitly referenced it won't be

>> serialized as part of a Task, and won't make it into the Accumulators

>> object in the first place.

>>

>> I should also note that what I'm proposing is not specific to Accumulators

>> -- I am proposing that any data can be stored in a thread-local variable. I

>> think there are probably many other use cases other than my one.

>>

>> Neil

>>

>>

>> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <r...@databricks.com> wrote:

>>

>> > Thanks for the thoughtful email, Neil and Christopher.

>> >

>> > If I understand this correctly, it seems like the dynamic variable is

>> just

>> > a variant of the accumulator (a static one since it is a global object).

>> > Accumulators are already implemented using thread-local variables under

>> the

>> > hood. Am I misunderstanding something?

>> >

>> >

>> >

>> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <c...@adatao.com>

>> > wrote:

>> >

>> > > Hi Neil, first off, I'm generally a sympathetic advocate for making

>> > changes

>> > > to Spark internals to make it easier/better/faster/more awesome.

>> > >

>> > > In this case, I'm (a) not clear about what you're trying to accomplish,

>> > and

>> > > (b) a bit worried about the proposed solution.

>> > >

>> > > On (a): it is stated that you want to pass some Accumulators around.

>> Yet

>> > > the proposed solution is for some "shared" variable that may be set and

>> > > "mapped out" and possibly "reduced back", but without any accompanying

>> > > accumulation semantics. And yet it doesn't seem like you only want just

>> > the

>> > > broadcast property. Can you clarify the problem statement with some

>> > > before/after client code examples?

>> > >

>> > > On (b): you're right that adding variables to SparkContext should be

>> done

>> > > with caution, as it may have unintended consequences beyond just serdes

>> > > payload size. For example, there is a stated intention of supporting

>> > > multiple SparkContexts in the future, and this proposed solution can

>> make

>> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to

>> > make

>> > > a while back on a subject related to this (see

>> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a

>> single

>> > > SparkContext application, there may be multiple "clients" (of that

>> > > application) whose intent to use the proposed "SparkDynamic" would not

>> > > necessarily be coordinated.

>> > >

>> > > So, considering a ratio of a/b (benefit/cost), it's not clear to me

>> that

>> > > the benefits are significant enough to warrant the costs. Do I

>> > > misunderstand that the benefit is to save one explicit parameter (the

>> > > "context") in the signature/closure code?

>> > >

>> > > --

>> > > Christopher T. Nguyen

>> > > Co-founder & CEO, Adatao <http://adatao.com>

>> > > linkedin.com/in/ctnguyen

>> > >

>> > >

>> > >

>> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nfergu...@gmail.com>

>> > > wrote:

>> > >

>> > > > Hi all

>> > > >

>> > > > I have been adding some metrics to the ADAM project

>> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and

>> > have a

>> > > > proposal for an enhancement to Spark that would make this work

>> cleaner

>> > > and

>> > > > easier.

>> > > >

>> > > > I need to pass some Accumulators around, which will aggregate metrics

>> > > > (timing stats and other metrics) across the cluster. However, it is

>> > > > cumbersome to have to explicitly pass some "context" containing these

>> > > > accumulators around everywhere that might need them. I can use Scala

>> > > > implicits, which help slightly, but I'd still need to modify every

>> > method

>> > > > in the call stack to take an implicit variable.

>> > > >

>> > > > So, I'd like to propose that we add the ability to have "dynamic

>> > > variables"

>> > > > (basically thread-local variables) to Spark. This would avoid having

>> to

>> > > > pass the Accumulators around explicitly.

>> > > >

>> > > > My proposed approach is to add a method to the SparkContext class as

>> > > > follows:

>> > > >

>> > > > /**

>> > > >  * Sets the value of a "dynamic variable". This value is made

>> available

>> > > to

>> > > > jobs

>> > > >  * without having to be passed around explicitly. During execution

>> of a

>> > > > Spark job

>> > > >  * this value can be obtained from the [[SparkDynamic]] object.

>> > > >  */

>> > > > def setDynamicVariableValue(value: Any)

>> > > >

>> > > > Then, when a job is executing the SparkDynamic can be accessed to

>> > obtain

>> > > > the value of the dynamic variable. The implementation of this object

>> is

>> > > as

>> > > > follows:

>> > > >

>> > > > object SparkDynamic {

>> > > >   private val dynamicVariable = new DynamicVariable[Any]()

>> > > >   /**

>> > > >    * Gets the value of the "dynamic variable" that has been set in

>> the

>> > > > [[SparkContext]]

>> > > >    */

>> > > >   def getValue: Option[Any] = {

>> > > >     Option(dynamicVariable.value)

>> > > >   }

>> > > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>

>> > > S): S

>> > > > = {

>> > > >     dynamicVariable.withValue(threadValue.orNull)(thunk)

>> > > >   }

>> > > > }

>> > > >

>> > > > The change involves modifying the Task object to serialize the value

>> of

>> > > the

>> > > > dynamic variable, and modifying the TaskRunner class to deserialize

>> the

>> > > > value and make it available in the thread that is running the task

>> > (using

>> > > > the SparkDynamic.withValue method).

>> > > >

>> > > > I have done a quick prototype of this in this commit:

>> > > >

>> > > >

>> > >

>> >

>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6

>> > > > and it seems to work fine in my (limited) testing. It needs more

>> > testing,

>> > > > tidy-up and documentation though.

>> > > >

>> > > > One drawback is that the dynamic variable will be serialized for

>> every

>> > > Task

>> > > > whether it needs it or not. For my use case this might not be too

>> much

>> > > of a

>> > > > problem, as serializing and deserializing Accumulators looks fairly

>> > > > lightweight -- however we should certainly warn users against

>> setting a

>> > > > dynamic variable containing lots of data. I thought about using

>> > broadcast

>> > > > tables here, but I don't think it's possible to put Accumulators in a

>> > > > broadcast table (as I understand it, they're intended for purely

>> > > read-only

>> > > > data).

>> > > >

>> > > > What do people think about this proposal? My use case aside, it seems

>> > > like

>> > > > it would be a generally useful enhancment to be able to pass certain

>> > data

>> > > > around without having to explicitly pass it everywhere.

>> > > >

>> > > > Neil

>> > > >

>> > >

>> >

>>

Reply via email to