pipeline visualization sounds like the exposure of the planner graph, not
the task graph.

If task could be set up using visitor, crunchr could certainly extend a
standard one.


On Tue, Nov 27, 2012 at 9:55 PM, Dmitriy Lyubimov <[email protected]> wrote:

> i certainly haven't understood yet the entire code but my first pass on
> the Crunch classes indicates that there are at least two DAGs built in
> fact.
>
> One is the enitre thing (the entire MR planner), based on Graph etc. and
> another one  is materialized DAG of RTNode's in a particular MR task.
>
> IMO R side doesn't need the former. it only needs to know of do function
> fusions, nothing else.Which means it really needs the access to the setup
> mechanism of RTNodes in the task. Ideally.
>
> Like i said, even that is probably excessive. It really needs an API to
> setup DoFn fusions only (at this point. There are probably more functions
> to fuse though). This api, sort of 3rd party sdk, doesn't even need to know
> it is crunchR that is using it of course.
>
> Of course I am very pragmatically driven and thus favor quick and dirty
> paths to make this thing usable.
>
> On another note, my process is ridiculously slow now. My lack of knowledge
> of R unit testing best practices really kills me. There is a concept of
> package unit tests in R but they still require package recompilation which
> is still a way too long cycle to debug stuff. Plus lack of a  completion
> tooling for R5 classes in StatEt at the same level as for java and scala
> really wears me down...  :) oh well.
>
> i am close to push another milestone (complete work count without combine
> function).
>
>
> On Tue, Nov 27, 2012 at 9:09 PM, Josh Wills <[email protected]> wrote:
>
>> On Sat, Nov 24, 2012 at 10:17 PM, Dmitriy Lyubimov <[email protected]
>> >wrote:
>>
>> > it looks like easy and naive solution might be to detect whenever
>> > IntermediateEmitter is used in a function and serialize one to R side
>> > instead of actually using it on Java side.
>> >
>> > thoughts?
>> >
>>
>> I can think of a few ways to do this, but none that I'm happy with yet.
>> The
>> overhead of the R-Java bridge is certainly something we would like to do
>> away with when we can; the question is whether determining when to avoid
>> it
>> should live on the Crunch side via the optimizer/runner, or whether we
>> should have the planner expose a data structure that explains the plan it
>> is going to use and allow the R side to use that plan to do the function
>> composition step itself before calling in to Crunch. That step could also
>> be useful in other environments-- I think Gabriel went at least partway
>> down this path already w/the pipeline visualization JIRA he did a few
>> weeks
>> back.
>>
>> The latter approach would mean that we would need to have RCrunch be it's
>> own (more R-like) wrapper around the underlying R-Java bridge, which also
>> has some appeal. But we'd need to play around with it a little and see
>> what
>> it looked like.
>>
>>
>> >
>> > On Sat, Nov 24, 2012 at 5:05 PM, Dmitriy Lyubimov <[email protected]>
>> > wrote:
>> >
>> > > Aha. I did a number of bug fixes so both examples (with 1 doFn and 2
>> > > doFn's ) are working now. Good.
>> > >
>> > > Josh, please read the comment to the second example. The intermediate
>> > > output of doFn #1 runs to java/Crunch and back just to be fed into
>> doFn
>> > #2.
>> > > I would very much like to short-circuit those things on R side.
>> Otherwise
>> > > it will be very hard to optimize multi-tenant applications (multiple
>> > > decoupled models encapsulated into bunch of doFn's distributed and
>> > > optimized by Crunch). Which is actually my pattern for production.
>> > >
>> > > I'd be eternally grateful if you could give it a thought. It may
>> require
>> > > some exposure of Crunch optimizer internals IMO.
>> > >
>> > > thank you, sir.
>> > >
>> > >
>> > > On Sat, Nov 24, 2012 at 11:53 AM, Dmitriy Lyubimov <[email protected]
>> > >wrote:
>> > >
>> > >> also one obvious optimization is that if we somehow could perform
>> > >> extraction of DoFn's DAG for a particular task, we could re-connect
>> that
>> > >> DAG on the R side instead of piping that data back and forth from R
>> to
>> > java
>> > >> DAG of doFn's. But i would need a help from somebody with deep inner
>> > >> knowledge of Crunch optimizer to extract and materialize such DAGs of
>> > >> functions on the R side.
>> > >>
>> > >>
>> > >> On Sat, Nov 24, 2012 at 11:13 AM, Dmitriy Lyubimov <
>> [email protected]
>> > >wrote:
>> > >>
>> > >>> Another perhaps useful piece of information is that process,
>> initialize
>> > >>> and cleanup R closures may share the same environment and this is
>> > handled
>> > >>> corerctly at the backend, e.g.
>> > >>>
>> > >>> createClosures <- function () {
>> > >>>    x <- 0, y<- 0
>> > >>>   startup <- function () x <<- 1
>> > >>>   process <- function(value) y <<- ifelse(x==1,2,0)
>> > >>>   cleanup <- function() emit(x + y)
>> > >>>
>> > >>>   list(process,startup,cleanup)
>> > >>> }
>> > >>>
>> > >>> this function will produce 3 closures that share same environment
>> > >>> containing x and y and each task at backend should emit value 3.
>> > >>>
>> > >>>
>> > >>> On Sat, Nov 24, 2012 at 10:54 AM, Dmitriy Lyubimov <
>> [email protected]
>> > >wrote:
>> > >>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On Sat, Nov 24, 2012 at 10:29 AM, Josh Wills <[email protected]
>> > >wrote:
>> > >>>>
>> > >>>>> Hey Dmitriy,
>> > >>>>>
>> > >>>>> I'm up and running w/Example1.R on my Linux machine-- very cool!
>> My
>> > >>>>> Mac is
>> > >>>>> having some sort of issue w/creating /tmp/crunch* directories
>> that I
>> > >>>>> need
>> > >>>>> to sort out.
>> > >>>>>
>> > >>>>> In the example you sent of the broken chaining of DoFns, why
>> didn't
>> > the
>> > >>>>> first line (quoted below) require a PType?
>> > >>>>
>> > >>>>
>> > >>>> Because the implementation assumes a default  type which is
>> character
>> > >>>> vector as below. Also, if it detects
>> > >>>> that key type was specified explicitly, it returns PTable
>> > automatically
>> > >>>> instead of PCollection.
>> > >>>>
>> > >>>> Further on, PTable's emits automatically assume emit(key,value)
>> > >>>> invocation for concise of notation (instead of java's
>> > Pair.of(key,value) )
>> > >>>> and PCollections assume just emit(value).
>> > >>>>
>> > >>>>  parallelDo = function ( FUN_PROCESS,
>> > >>>> FUN_INITIALIZE=NULL,FUN_CLEANUP=NULL,
>> > >>>> valueType=crunchR.RStrings$new(), keyType) {
>> > >>>>  if (missing(keyType)) {
>> > >>>>
>> > >>>>
>> >
>> .parallelDo.PCollection(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,valueType)
>> > >>>>  } else {
>> > >>>>
>> > >>>>
>> >
>> .parallelDo.PTable(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,keyType,valueType)
>> > >>>> }
>> > >>>>  },
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> Is there a shortcut for the case
>> > >>>>> when the PType of the child is the same as the PType of the
>> parent?
>> > >>>>>
>> > >>>>
>> > >>>> er... no. it kind of always assume RStrings (which assumes
>> > >>>> PType<String> but corresponding R type is multi-emit, i.e. you can
>> > emit a
>> > >>>> vector once and internally it will translate into bunch of calls of
>> > >>>> emit(String). This is a notion that i made specifically for R
>> since R
>> > >>>> operates with vectors and lists, so i can emit just one vector type
>> > and
>> > >>>> declare it a multi-emit. It is not clear to me if this notion will
>> > have a
>> > >>>> benefit. Obviously, you still can emit R character vector as a
>> single
>> > >>>> value, too, but you would have to select different RType thing
>> there
>> > to
>> > >>>> imply your intent.
>> > >>>>
>> > >>>> Word count is a good example where multi-emit RType serves you
>> well:
>> > >>>> you output result of split[[1]] which is a character vector as one
>> R
>> > call
>> > >>>> emit(split...) but it translates into bunch of individual emits
>> (the
>> > >>>> variant i had before this last one with PTable, or the one
>> commented
>> > one
>> > >>>> here :
>> > >>>>
>> > >>>> # wordsPCol <- inputPCol$parallelDo(
>> > >>>> > # function(line) emit(
>> strsplit(tolower(line),"[^[:alnum:]]+")[[1]]
>> > )
>> > >>>> > # )
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>>
>> > >>>>> # wordsPCol <- inputPCol$parallelDo(
>> > >>>>> # function(line) emit(
>> strsplit(tolower(line),"[^[:alnum:]]+")[[1]] )
>> > >>>>> # )
>> > >>>>>
>> > >>>>> Josh
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> On Fri, Nov 23, 2012 at 1:59 PM, Dmitriy Lyubimov <
>> [email protected]
>> > >
>> > >>>>> wrote:
>> > >>>>>
>> > >>>>> > ok support for PTable emission (key,value) pairs work in the
>> latest
>> > >>>>> commit.
>> > >>>>> >
>> > >>>>> > My current problem is that composition of doFunctions doesn't
>> work,
>> > >>>>> > probably because of the sequence of cleanup() calls. I have to
>> > >>>>> figure out:
>> > >>>>> >
>> > >>>>> > =============
>> > >>>>> > this composition of 2 functions (PCollection, PTable) is a
>> problem
>> > >>>>> >
>> > >>>>> > # wordsPCol <- inputPCol$parallelDo(
>> > >>>>> > # function(line) emit(
>> > strsplit(tolower(line),"[^[:alnum:]]+")[[1]] )
>> > >>>>> > # )
>> > >>>>> > #
>> > >>>>> > # wordsPTab <- wordsPCol$parallelDo(function(word) emit(word,1),
>> > >>>>> > # keyType = crunchR.RString$new(),
>> > >>>>> > # valueType = crunchR.RUint32$new())
>> > >>>>> >
>> > >>>>> > but this equivalent works:
>> > >>>>> > wordsPTab <- inputPCol$parallelDo(
>> > >>>>> > function(line) {
>> > >>>>> > words<- strsplit(tolower(line),"[^[:alnum:]]+")[[1]]
>> > >>>>> > sapply(words, function(x) emit(x,1))
>> > >>>>> > },
>> > >>>>> > keyType = crunchR.RString$new(),
>> > >>>>> > valueType = crunchR.RUint32$new()
>> > >>>>> > )
>> > >>>>> >
>> > >>>>> >
>> > >>>>> >
>> > >>>>> > On Thu, Nov 22, 2012 at 2:13 PM, Dmitriy Lyubimov <
>> > [email protected]
>> > >>>>> >
>> > >>>>> > wrote:
>> > >>>>> >
>> > >>>>> > > Ok ,  I guess i am going to work on the next milestone which
>> is
>> > >>>>> > PTableType
>> > >>>>> > > serialization support between R and java sides.
>> > >>>>> > >
>> > >>>>> > > once i am done with that, i guess i will be able to add other
>> api
>> > >>>>> and
>> > >>>>> > > complete word count example fairly easily.
>> > >>>>> > >
>> > >>>>> > > Example1.R in its current state works.
>> > >>>>> > >
>> > >>>>> > >
>> > >>>>> > > On Wed, Nov 21, 2012 at 12:11 PM, Josh Wills <
>> > [email protected]>
>> > >>>>> > wrote:
>> > >>>>> > >
>> > >>>>> > >> I'm going to play with this again over the break-- BTW, did
>> you
>> > >>>>> see
>> > >>>>> > >> Renjin?
>> > >>>>> > >> I somehow missed this, but it looks interesting.
>> > >>>>> > >>
>> > >>>>> > >> http://code.google.com/p/renjin/
>> > >>>>> > >>
>> > >>>>> > >>
>> > >>>>> > >> On Sun, Nov 18, 2012 at 11:44 AM, Dmitriy Lyubimov <
>> > >>>>> [email protected]
>> > >>>>> > >> >wrote:
>> > >>>>> > >>
>> > >>>>> > >> > On Sun, Nov 18, 2012 at 9:37 AM, Josh Wills <
>> > >>>>> [email protected]>
>> > >>>>> > >> wrote:
>> > >>>>> > >> >
>> > >>>>> > >> > > Dmitrity,
>> > >>>>> > >> > >
>> > >>>>> > >> > > Just sent you a pull request based on playing with the
>> code
>> > >>>>> on OS X.
>> > >>>>> > >> It
>> > >>>>> > >> > > contains a README about my experience getting things
>> > working.
>> > >>>>> > >> > >
>> > >>>>> > >> >
>> > >>>>> > >> > Are you sure it is doxygen package? I thought it was
>> roxygen2
>> > >>>>> package?
>> > >>>>> > >> >
>> > >>>>> > >> > Actually there seems currently no best practice in
>> existence
>> > >>>>> for R5
>> > >>>>> > >> classes
>> > >>>>> > >> > + roxygen2 (and the guy ignores @import order of files,
>> too).
>> > >>>>> Hence
>> > >>>>> > the
>> > >>>>> > >> > hacks with file names.
>> > >>>>> > >> >
>> > >>>>> > >> >
>> > >>>>> > >> > > Unfortunately, I haven't succeeded in getting crunchR
>> > loaded,
>> > >>>>> I'm
>> > >>>>> > >> running
>> > >>>>> > >> > > into some issues w/RProtoBuf on OS X. I'll give it
>> another
>> > go
>> > >>>>> this
>> > >>>>> > >> week
>> > >>>>> > >> > on
>> > >>>>> > >> > > my Linux machine at work.
>> > >>>>> > >> > >
>> > >>>>> > >> > ok i removed @import RProtoBuf, you should be able to
>> install
>> > >>>>> w/o it.
>> > >>>>> > >> Maven
>> > >>>>> > >> > still compiles protoc  stuff though.
>> > >>>>> > >> >
>> > >>>>> > >> > >
>> > >>>>> > >> > > J
>> > >>>>> > >> > >
>> > >>>>> > >> > >
>> > >>>>> > >> > > On Sat, Nov 17, 2012 at 12:49 PM, Dmitriy Lyubimov <
>> > >>>>> > [email protected]
>> > >>>>> > >> > > >wrote:
>> > >>>>> > >> > >
>> > >>>>> > >> > > > Josh,
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > ok the following commit
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > ==============
>> > >>>>> > >> > > > commit 67605360838f810fa5ddf99abb3ef2962d3f05e3
>> > >>>>> > >> > > > Author: Dmitriy Lyubimov <[email protected]>
>> > >>>>> > >> > > > Date:   Sat Nov 17 12:29:27 2012 -0800
>> > >>>>> > >> > > >
>> > >>>>> > >> > > >     example1 succeeds
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > ====================
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > runs example 1 for me successfully in a fully
>> distributed
>> > >>>>> way
>> > >>>>> > which
>> > >>>>> > >> is
>> > >>>>> > >> > > > first step (map-only thing) for the word count.
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > (I think there's a hickup somewhere here because in the
>> > >>>>> output i
>> > >>>>> > >> also
>> > >>>>> > >> > > seem
>> > >>>>> > >> > > > to see some empty lines, so the strsplit() part is
>> perhaps
>> > >>>>> set up
>> > >>>>> > >> > > somewhat
>> > >>>>> > >> > > > incorrectly here, but it's not the point right now):
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > ====Example1.R===========
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > library(crunchR)
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > pipeline <- crunchR.MRPipeline$new("test-pipeline")
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > inputPCol <-
>> > >>>>> pipeline$readTextFile("/crunchr-examples/input")
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > outputPCol <- inputPCol$parallelDo(
>> > >>>>> > >> > > > function(line) emit(
>> > >>>>> strsplit(tolower(line),"[^[:alnum:]]")[[1]] )
>> > >>>>> > >> > > > )
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > outputPCol$writeTextFile("/crunchr-examples/output")
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > result <- pipeline$run()
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > if ( !result$succeeded() ) stop ("pipeline failed.")
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > ========================================
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > I think R-java communication now should support
>> multiple
>> > >>>>> doFn ok
>> > >>>>> > and
>> > >>>>> > >> > they
>> > >>>>> > >> > > > will be properly shut down and executed and
>> synchronized
>> > >>>>> even if
>> > >>>>> > >> they
>> > >>>>> > >> > > emit
>> > >>>>> > >> > > > in the cleanup phase.
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > This example assumes a lot of defaults (such as RTypes
>> > >>>>> which are
>> > >>>>> > by
>> > >>>>> > >> > > default
>> > >>>>> > >> > > > character vector singleton in and character vector out
>> > for a
>> > >>>>> > DoFn).
>> > >>>>> > >> > Also
>> > >>>>> > >> > > > obviously uses text in-text out at this point only.
>> > >>>>> > >> > > >
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > To run, install the package and upload the test input
>> > >>>>> > (test-prep.sh)
>> > >>>>> > >> > > > Assuming you have compiled the maven part, the R
>> package
>> > >>>>> snapshot
>> > >>>>> > >> could
>> > >>>>> > >> > > be
>> > >>>>> > >> > > > installed by running "install-snapshot-rpkg.sh".
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > You also need to make sure your backend tasks see JRI
>> > >>>>> library.
>> > >>>>> > there
>> > >>>>> > >> > are
>> > >>>>> > >> > > > multiple ways to do it i guess but for the purposes of
>> > >>>>> testing the
>> > >>>>> > >> > > > following just works for me in my mapred-site:
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > <property>
>> > >>>>> > >> > > >    <name>mapred.child.java.opts</name>
>> > >>>>> > >> > > >
>> > >>>>> > >> > > >
>> > >>>>> > >> > > >
>> > >>>>> > >> > >
>> > >>>>> > >> >
>> > >>>>> > >>
>> > >>>>> >
>> > >>>>>
>> >
>>  
>> <value>-Djava.library.path=/home/dmitriy/R/x86_64-pc-linux-gnu-library/2/rJava/jri
>> > >>>>> > >> > > > </value>
>> > >>>>> > >> > > >    <final>false</final>
>> > >>>>> > >> > > > </property>
>> > >>>>> > >> > > >
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > I think at this point you guys might help me by doing
>> > >>>>> review of
>> > >>>>> > that
>> > >>>>> > >> > > stuff,
>> > >>>>> > >> > > > asking questions and making suggestions how to go by
>> > >>>>> incorporating
>> > >>>>> > >> > other
>> > >>>>> > >> > > > types of doFn and perhaps a way to complete the word
>> count
>> > >>>>> > example,
>> > >>>>> > >> > > perhaps
>> > >>>>> > >> > > > running comparative benchmarks with a java-only word
>> > count,
>> > >>>>> how
>> > >>>>> > much
>> > >>>>> > >> > > > overhead we seem to be suffering here.
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > I use StatEt in eclipse. Although it is a huge way
>> > forward,
>> > >>>>> the
>> > >>>>> > >> process
>> > >>>>> > >> > > is
>> > >>>>> > >> > > > still extremely tedious since I don't know unit testing
>> > >>>>> framework
>> > >>>>> > >> in R
>> > >>>>> > >> > > well
>> > >>>>> > >> > > > (so i just scribble some stuff on the side to unit-test
>> > >>>>> this and
>> > >>>>> > >> that)
>> > >>>>> > >> > > and
>> > >>>>> > >> > > > the integration test running cycle is significant
>> enough.
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > Which is why any help and suggestions are very welcome!
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > I will definitely add support for reading/writing
>> sequence
>> > >>>>> files
>> > >>>>> > and
>> > >>>>> > >> > > > Protobufs, as well as Mahout DRM's .
>> > >>>>> > >> > > >
>> > >>>>> > >> > > >
>> > >>>>> > >> > > > Thanks.
>> > >>>>> > >> > > > -Dmitrity
>> > >>>>> > >> > > >
>> > >>>>> > >> > >
>> > >>>>> > >> >
>> > >>>>> > >>
>> > >>>>> > >>
>> > >>>>> > >>
>> > >>>>> > >> --
>> > >>>>> > >> Director of Data Science
>> > >>>>> > >> Cloudera <http://www.cloudera.com>
>> > >>>>> > >> Twitter: @josh_wills <http://twitter.com/josh_wills>
>> > >>>>> > >>
>> > >>>>> > >
>> > >>>>> > >
>> > >>>>> >
>> > >>>>>
>> > >>>>
>> > >>>>
>> > >>>
>> > >>
>> > >
>> >
>>
>
>

Reply via email to