pipeline visualization sounds like the exposure of the planner graph, not the task graph.
If task could be set up using visitor, crunchr could certainly extend a standard one. On Tue, Nov 27, 2012 at 9:55 PM, Dmitriy Lyubimov <[email protected]> wrote: > i certainly haven't understood yet the entire code but my first pass on > the Crunch classes indicates that there are at least two DAGs built in > fact. > > One is the enitre thing (the entire MR planner), based on Graph etc. and > another one is materialized DAG of RTNode's in a particular MR task. > > IMO R side doesn't need the former. it only needs to know of do function > fusions, nothing else.Which means it really needs the access to the setup > mechanism of RTNodes in the task. Ideally. > > Like i said, even that is probably excessive. It really needs an API to > setup DoFn fusions only (at this point. There are probably more functions > to fuse though). This api, sort of 3rd party sdk, doesn't even need to know > it is crunchR that is using it of course. > > Of course I am very pragmatically driven and thus favor quick and dirty > paths to make this thing usable. > > On another note, my process is ridiculously slow now. My lack of knowledge > of R unit testing best practices really kills me. There is a concept of > package unit tests in R but they still require package recompilation which > is still a way too long cycle to debug stuff. Plus lack of a completion > tooling for R5 classes in StatEt at the same level as for java and scala > really wears me down... :) oh well. > > i am close to push another milestone (complete work count without combine > function). > > > On Tue, Nov 27, 2012 at 9:09 PM, Josh Wills <[email protected]> wrote: > >> On Sat, Nov 24, 2012 at 10:17 PM, Dmitriy Lyubimov <[email protected] >> >wrote: >> >> > it looks like easy and naive solution might be to detect whenever >> > IntermediateEmitter is used in a function and serialize one to R side >> > instead of actually using it on Java side. >> > >> > thoughts? >> > >> >> I can think of a few ways to do this, but none that I'm happy with yet. >> The >> overhead of the R-Java bridge is certainly something we would like to do >> away with when we can; the question is whether determining when to avoid >> it >> should live on the Crunch side via the optimizer/runner, or whether we >> should have the planner expose a data structure that explains the plan it >> is going to use and allow the R side to use that plan to do the function >> composition step itself before calling in to Crunch. That step could also >> be useful in other environments-- I think Gabriel went at least partway >> down this path already w/the pipeline visualization JIRA he did a few >> weeks >> back. >> >> The latter approach would mean that we would need to have RCrunch be it's >> own (more R-like) wrapper around the underlying R-Java bridge, which also >> has some appeal. But we'd need to play around with it a little and see >> what >> it looked like. >> >> >> > >> > On Sat, Nov 24, 2012 at 5:05 PM, Dmitriy Lyubimov <[email protected]> >> > wrote: >> > >> > > Aha. I did a number of bug fixes so both examples (with 1 doFn and 2 >> > > doFn's ) are working now. Good. >> > > >> > > Josh, please read the comment to the second example. The intermediate >> > > output of doFn #1 runs to java/Crunch and back just to be fed into >> doFn >> > #2. >> > > I would very much like to short-circuit those things on R side. >> Otherwise >> > > it will be very hard to optimize multi-tenant applications (multiple >> > > decoupled models encapsulated into bunch of doFn's distributed and >> > > optimized by Crunch). Which is actually my pattern for production. >> > > >> > > I'd be eternally grateful if you could give it a thought. It may >> require >> > > some exposure of Crunch optimizer internals IMO. >> > > >> > > thank you, sir. >> > > >> > > >> > > On Sat, Nov 24, 2012 at 11:53 AM, Dmitriy Lyubimov <[email protected] >> > >wrote: >> > > >> > >> also one obvious optimization is that if we somehow could perform >> > >> extraction of DoFn's DAG for a particular task, we could re-connect >> that >> > >> DAG on the R side instead of piping that data back and forth from R >> to >> > java >> > >> DAG of doFn's. But i would need a help from somebody with deep inner >> > >> knowledge of Crunch optimizer to extract and materialize such DAGs of >> > >> functions on the R side. >> > >> >> > >> >> > >> On Sat, Nov 24, 2012 at 11:13 AM, Dmitriy Lyubimov < >> [email protected] >> > >wrote: >> > >> >> > >>> Another perhaps useful piece of information is that process, >> initialize >> > >>> and cleanup R closures may share the same environment and this is >> > handled >> > >>> corerctly at the backend, e.g. >> > >>> >> > >>> createClosures <- function () { >> > >>> x <- 0, y<- 0 >> > >>> startup <- function () x <<- 1 >> > >>> process <- function(value) y <<- ifelse(x==1,2,0) >> > >>> cleanup <- function() emit(x + y) >> > >>> >> > >>> list(process,startup,cleanup) >> > >>> } >> > >>> >> > >>> this function will produce 3 closures that share same environment >> > >>> containing x and y and each task at backend should emit value 3. >> > >>> >> > >>> >> > >>> On Sat, Nov 24, 2012 at 10:54 AM, Dmitriy Lyubimov < >> [email protected] >> > >wrote: >> > >>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> On Sat, Nov 24, 2012 at 10:29 AM, Josh Wills <[email protected] >> > >wrote: >> > >>>> >> > >>>>> Hey Dmitriy, >> > >>>>> >> > >>>>> I'm up and running w/Example1.R on my Linux machine-- very cool! >> My >> > >>>>> Mac is >> > >>>>> having some sort of issue w/creating /tmp/crunch* directories >> that I >> > >>>>> need >> > >>>>> to sort out. >> > >>>>> >> > >>>>> In the example you sent of the broken chaining of DoFns, why >> didn't >> > the >> > >>>>> first line (quoted below) require a PType? >> > >>>> >> > >>>> >> > >>>> Because the implementation assumes a default type which is >> character >> > >>>> vector as below. Also, if it detects >> > >>>> that key type was specified explicitly, it returns PTable >> > automatically >> > >>>> instead of PCollection. >> > >>>> >> > >>>> Further on, PTable's emits automatically assume emit(key,value) >> > >>>> invocation for concise of notation (instead of java's >> > Pair.of(key,value) ) >> > >>>> and PCollections assume just emit(value). >> > >>>> >> > >>>> parallelDo = function ( FUN_PROCESS, >> > >>>> FUN_INITIALIZE=NULL,FUN_CLEANUP=NULL, >> > >>>> valueType=crunchR.RStrings$new(), keyType) { >> > >>>> if (missing(keyType)) { >> > >>>> >> > >>>> >> > >> .parallelDo.PCollection(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,valueType) >> > >>>> } else { >> > >>>> >> > >>>> >> > >> .parallelDo.PTable(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,keyType,valueType) >> > >>>> } >> > >>>> }, >> > >>>> >> > >>>> >> > >>>> >> > >>>> Is there a shortcut for the case >> > >>>>> when the PType of the child is the same as the PType of the >> parent? >> > >>>>> >> > >>>> >> > >>>> er... no. it kind of always assume RStrings (which assumes >> > >>>> PType<String> but corresponding R type is multi-emit, i.e. you can >> > emit a >> > >>>> vector once and internally it will translate into bunch of calls of >> > >>>> emit(String). This is a notion that i made specifically for R >> since R >> > >>>> operates with vectors and lists, so i can emit just one vector type >> > and >> > >>>> declare it a multi-emit. It is not clear to me if this notion will >> > have a >> > >>>> benefit. Obviously, you still can emit R character vector as a >> single >> > >>>> value, too, but you would have to select different RType thing >> there >> > to >> > >>>> imply your intent. >> > >>>> >> > >>>> Word count is a good example where multi-emit RType serves you >> well: >> > >>>> you output result of split[[1]] which is a character vector as one >> R >> > call >> > >>>> emit(split...) but it translates into bunch of individual emits >> (the >> > >>>> variant i had before this last one with PTable, or the one >> commented >> > one >> > >>>> here : >> > >>>> >> > >>>> # wordsPCol <- inputPCol$parallelDo( >> > >>>> > # function(line) emit( >> strsplit(tolower(line),"[^[:alnum:]]+")[[1]] >> > ) >> > >>>> > # ) >> > >>>> >> > >>>> >> > >>>> >> > >>>>> >> > >>>>> # wordsPCol <- inputPCol$parallelDo( >> > >>>>> # function(line) emit( >> strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >> > >>>>> # ) >> > >>>>> >> > >>>>> Josh >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> On Fri, Nov 23, 2012 at 1:59 PM, Dmitriy Lyubimov < >> [email protected] >> > > >> > >>>>> wrote: >> > >>>>> >> > >>>>> > ok support for PTable emission (key,value) pairs work in the >> latest >> > >>>>> commit. >> > >>>>> > >> > >>>>> > My current problem is that composition of doFunctions doesn't >> work, >> > >>>>> > probably because of the sequence of cleanup() calls. I have to >> > >>>>> figure out: >> > >>>>> > >> > >>>>> > ============= >> > >>>>> > this composition of 2 functions (PCollection, PTable) is a >> problem >> > >>>>> > >> > >>>>> > # wordsPCol <- inputPCol$parallelDo( >> > >>>>> > # function(line) emit( >> > strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >> > >>>>> > # ) >> > >>>>> > # >> > >>>>> > # wordsPTab <- wordsPCol$parallelDo(function(word) emit(word,1), >> > >>>>> > # keyType = crunchR.RString$new(), >> > >>>>> > # valueType = crunchR.RUint32$new()) >> > >>>>> > >> > >>>>> > but this equivalent works: >> > >>>>> > wordsPTab <- inputPCol$parallelDo( >> > >>>>> > function(line) { >> > >>>>> > words<- strsplit(tolower(line),"[^[:alnum:]]+")[[1]] >> > >>>>> > sapply(words, function(x) emit(x,1)) >> > >>>>> > }, >> > >>>>> > keyType = crunchR.RString$new(), >> > >>>>> > valueType = crunchR.RUint32$new() >> > >>>>> > ) >> > >>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> > On Thu, Nov 22, 2012 at 2:13 PM, Dmitriy Lyubimov < >> > [email protected] >> > >>>>> > >> > >>>>> > wrote: >> > >>>>> > >> > >>>>> > > Ok , I guess i am going to work on the next milestone which >> is >> > >>>>> > PTableType >> > >>>>> > > serialization support between R and java sides. >> > >>>>> > > >> > >>>>> > > once i am done with that, i guess i will be able to add other >> api >> > >>>>> and >> > >>>>> > > complete word count example fairly easily. >> > >>>>> > > >> > >>>>> > > Example1.R in its current state works. >> > >>>>> > > >> > >>>>> > > >> > >>>>> > > On Wed, Nov 21, 2012 at 12:11 PM, Josh Wills < >> > [email protected]> >> > >>>>> > wrote: >> > >>>>> > > >> > >>>>> > >> I'm going to play with this again over the break-- BTW, did >> you >> > >>>>> see >> > >>>>> > >> Renjin? >> > >>>>> > >> I somehow missed this, but it looks interesting. >> > >>>>> > >> >> > >>>>> > >> http://code.google.com/p/renjin/ >> > >>>>> > >> >> > >>>>> > >> >> > >>>>> > >> On Sun, Nov 18, 2012 at 11:44 AM, Dmitriy Lyubimov < >> > >>>>> [email protected] >> > >>>>> > >> >wrote: >> > >>>>> > >> >> > >>>>> > >> > On Sun, Nov 18, 2012 at 9:37 AM, Josh Wills < >> > >>>>> [email protected]> >> > >>>>> > >> wrote: >> > >>>>> > >> > >> > >>>>> > >> > > Dmitrity, >> > >>>>> > >> > > >> > >>>>> > >> > > Just sent you a pull request based on playing with the >> code >> > >>>>> on OS X. >> > >>>>> > >> It >> > >>>>> > >> > > contains a README about my experience getting things >> > working. >> > >>>>> > >> > > >> > >>>>> > >> > >> > >>>>> > >> > Are you sure it is doxygen package? I thought it was >> roxygen2 >> > >>>>> package? >> > >>>>> > >> > >> > >>>>> > >> > Actually there seems currently no best practice in >> existence >> > >>>>> for R5 >> > >>>>> > >> classes >> > >>>>> > >> > + roxygen2 (and the guy ignores @import order of files, >> too). >> > >>>>> Hence >> > >>>>> > the >> > >>>>> > >> > hacks with file names. >> > >>>>> > >> > >> > >>>>> > >> > >> > >>>>> > >> > > Unfortunately, I haven't succeeded in getting crunchR >> > loaded, >> > >>>>> I'm >> > >>>>> > >> running >> > >>>>> > >> > > into some issues w/RProtoBuf on OS X. I'll give it >> another >> > go >> > >>>>> this >> > >>>>> > >> week >> > >>>>> > >> > on >> > >>>>> > >> > > my Linux machine at work. >> > >>>>> > >> > > >> > >>>>> > >> > ok i removed @import RProtoBuf, you should be able to >> install >> > >>>>> w/o it. >> > >>>>> > >> Maven >> > >>>>> > >> > still compiles protoc stuff though. >> > >>>>> > >> > >> > >>>>> > >> > > >> > >>>>> > >> > > J >> > >>>>> > >> > > >> > >>>>> > >> > > >> > >>>>> > >> > > On Sat, Nov 17, 2012 at 12:49 PM, Dmitriy Lyubimov < >> > >>>>> > [email protected] >> > >>>>> > >> > > >wrote: >> > >>>>> > >> > > >> > >>>>> > >> > > > Josh, >> > >>>>> > >> > > > >> > >>>>> > >> > > > ok the following commit >> > >>>>> > >> > > > >> > >>>>> > >> > > > ============== >> > >>>>> > >> > > > commit 67605360838f810fa5ddf99abb3ef2962d3f05e3 >> > >>>>> > >> > > > Author: Dmitriy Lyubimov <[email protected]> >> > >>>>> > >> > > > Date: Sat Nov 17 12:29:27 2012 -0800 >> > >>>>> > >> > > > >> > >>>>> > >> > > > example1 succeeds >> > >>>>> > >> > > > >> > >>>>> > >> > > > ==================== >> > >>>>> > >> > > > >> > >>>>> > >> > > > runs example 1 for me successfully in a fully >> distributed >> > >>>>> way >> > >>>>> > which >> > >>>>> > >> is >> > >>>>> > >> > > > first step (map-only thing) for the word count. >> > >>>>> > >> > > > >> > >>>>> > >> > > > (I think there's a hickup somewhere here because in the >> > >>>>> output i >> > >>>>> > >> also >> > >>>>> > >> > > seem >> > >>>>> > >> > > > to see some empty lines, so the strsplit() part is >> perhaps >> > >>>>> set up >> > >>>>> > >> > > somewhat >> > >>>>> > >> > > > incorrectly here, but it's not the point right now): >> > >>>>> > >> > > > >> > >>>>> > >> > > > ====Example1.R=========== >> > >>>>> > >> > > > >> > >>>>> > >> > > > library(crunchR) >> > >>>>> > >> > > > >> > >>>>> > >> > > > pipeline <- crunchR.MRPipeline$new("test-pipeline") >> > >>>>> > >> > > > >> > >>>>> > >> > > > inputPCol <- >> > >>>>> pipeline$readTextFile("/crunchr-examples/input") >> > >>>>> > >> > > > >> > >>>>> > >> > > > outputPCol <- inputPCol$parallelDo( >> > >>>>> > >> > > > function(line) emit( >> > >>>>> strsplit(tolower(line),"[^[:alnum:]]")[[1]] ) >> > >>>>> > >> > > > ) >> > >>>>> > >> > > > >> > >>>>> > >> > > > outputPCol$writeTextFile("/crunchr-examples/output") >> > >>>>> > >> > > > >> > >>>>> > >> > > > result <- pipeline$run() >> > >>>>> > >> > > > >> > >>>>> > >> > > > if ( !result$succeeded() ) stop ("pipeline failed.") >> > >>>>> > >> > > > >> > >>>>> > >> > > > ======================================== >> > >>>>> > >> > > > >> > >>>>> > >> > > > I think R-java communication now should support >> multiple >> > >>>>> doFn ok >> > >>>>> > and >> > >>>>> > >> > they >> > >>>>> > >> > > > will be properly shut down and executed and >> synchronized >> > >>>>> even if >> > >>>>> > >> they >> > >>>>> > >> > > emit >> > >>>>> > >> > > > in the cleanup phase. >> > >>>>> > >> > > > >> > >>>>> > >> > > > This example assumes a lot of defaults (such as RTypes >> > >>>>> which are >> > >>>>> > by >> > >>>>> > >> > > default >> > >>>>> > >> > > > character vector singleton in and character vector out >> > for a >> > >>>>> > DoFn). >> > >>>>> > >> > Also >> > >>>>> > >> > > > obviously uses text in-text out at this point only. >> > >>>>> > >> > > > >> > >>>>> > >> > > > >> > >>>>> > >> > > > To run, install the package and upload the test input >> > >>>>> > (test-prep.sh) >> > >>>>> > >> > > > Assuming you have compiled the maven part, the R >> package >> > >>>>> snapshot >> > >>>>> > >> could >> > >>>>> > >> > > be >> > >>>>> > >> > > > installed by running "install-snapshot-rpkg.sh". >> > >>>>> > >> > > > >> > >>>>> > >> > > > You also need to make sure your backend tasks see JRI >> > >>>>> library. >> > >>>>> > there >> > >>>>> > >> > are >> > >>>>> > >> > > > multiple ways to do it i guess but for the purposes of >> > >>>>> testing the >> > >>>>> > >> > > > following just works for me in my mapred-site: >> > >>>>> > >> > > > >> > >>>>> > >> > > > <property> >> > >>>>> > >> > > > <name>mapred.child.java.opts</name> >> > >>>>> > >> > > > >> > >>>>> > >> > > > >> > >>>>> > >> > > > >> > >>>>> > >> > > >> > >>>>> > >> > >> > >>>>> > >> >> > >>>>> > >> > >>>>> >> > >> >> <value>-Djava.library.path=/home/dmitriy/R/x86_64-pc-linux-gnu-library/2/rJava/jri >> > >>>>> > >> > > > </value> >> > >>>>> > >> > > > <final>false</final> >> > >>>>> > >> > > > </property> >> > >>>>> > >> > > > >> > >>>>> > >> > > > >> > >>>>> > >> > > > I think at this point you guys might help me by doing >> > >>>>> review of >> > >>>>> > that >> > >>>>> > >> > > stuff, >> > >>>>> > >> > > > asking questions and making suggestions how to go by >> > >>>>> incorporating >> > >>>>> > >> > other >> > >>>>> > >> > > > types of doFn and perhaps a way to complete the word >> count >> > >>>>> > example, >> > >>>>> > >> > > perhaps >> > >>>>> > >> > > > running comparative benchmarks with a java-only word >> > count, >> > >>>>> how >> > >>>>> > much >> > >>>>> > >> > > > overhead we seem to be suffering here. >> > >>>>> > >> > > > >> > >>>>> > >> > > > I use StatEt in eclipse. Although it is a huge way >> > forward, >> > >>>>> the >> > >>>>> > >> process >> > >>>>> > >> > > is >> > >>>>> > >> > > > still extremely tedious since I don't know unit testing >> > >>>>> framework >> > >>>>> > >> in R >> > >>>>> > >> > > well >> > >>>>> > >> > > > (so i just scribble some stuff on the side to unit-test >> > >>>>> this and >> > >>>>> > >> that) >> > >>>>> > >> > > and >> > >>>>> > >> > > > the integration test running cycle is significant >> enough. >> > >>>>> > >> > > > >> > >>>>> > >> > > > Which is why any help and suggestions are very welcome! >> > >>>>> > >> > > > >> > >>>>> > >> > > > I will definitely add support for reading/writing >> sequence >> > >>>>> files >> > >>>>> > and >> > >>>>> > >> > > > Protobufs, as well as Mahout DRM's . >> > >>>>> > >> > > > >> > >>>>> > >> > > > >> > >>>>> > >> > > > Thanks. >> > >>>>> > >> > > > -Dmitrity >> > >>>>> > >> > > > >> > >>>>> > >> > > >> > >>>>> > >> > >> > >>>>> > >> >> > >>>>> > >> >> > >>>>> > >> >> > >>>>> > >> -- >> > >>>>> > >> Director of Data Science >> > >>>>> > >> Cloudera <http://www.cloudera.com> >> > >>>>> > >> Twitter: @josh_wills <http://twitter.com/josh_wills> >> > >>>>> > >> >> > >>>>> > > >> > >>>>> > > >> > >>>>> > >> > >>>>> >> > >>>> >> > >>>> >> > >>> >> > >> >> > > >> > >> > >
