it looks like easy and naive solution might be to detect whenever IntermediateEmitter is used in a function and serialize one to R side instead of actually using it on Java side.
thoughts? On Sat, Nov 24, 2012 at 5:05 PM, Dmitriy Lyubimov <[email protected]> wrote: > Aha. I did a number of bug fixes so both examples (with 1 doFn and 2 > doFn's ) are working now. Good. > > Josh, please read the comment to the second example. The intermediate > output of doFn #1 runs to java/Crunch and back just to be fed into doFn #2. > I would very much like to short-circuit those things on R side. Otherwise > it will be very hard to optimize multi-tenant applications (multiple > decoupled models encapsulated into bunch of doFn's distributed and > optimized by Crunch). Which is actually my pattern for production. > > I'd be eternally grateful if you could give it a thought. It may require > some exposure of Crunch optimizer internals IMO. > > thank you, sir. > > > On Sat, Nov 24, 2012 at 11:53 AM, Dmitriy Lyubimov <[email protected]>wrote: > >> also one obvious optimization is that if we somehow could perform >> extraction of DoFn's DAG for a particular task, we could re-connect that >> DAG on the R side instead of piping that data back and forth from R to java >> DAG of doFn's. But i would need a help from somebody with deep inner >> knowledge of Crunch optimizer to extract and materialize such DAGs of >> functions on the R side. >> >> >> On Sat, Nov 24, 2012 at 11:13 AM, Dmitriy Lyubimov <[email protected]>wrote: >> >>> Another perhaps useful piece of information is that process, initialize >>> and cleanup R closures may share the same environment and this is handled >>> corerctly at the backend, e.g. >>> >>> createClosures <- function () { >>> x <- 0, y<- 0 >>> startup <- function () x <<- 1 >>> process <- function(value) y <<- ifelse(x==1,2,0) >>> cleanup <- function() emit(x + y) >>> >>> list(process,startup,cleanup) >>> } >>> >>> this function will produce 3 closures that share same environment >>> containing x and y and each task at backend should emit value 3. >>> >>> >>> On Sat, Nov 24, 2012 at 10:54 AM, Dmitriy Lyubimov <[email protected]>wrote: >>> >>>> >>>> >>>> >>>> On Sat, Nov 24, 2012 at 10:29 AM, Josh Wills <[email protected]>wrote: >>>> >>>>> Hey Dmitriy, >>>>> >>>>> I'm up and running w/Example1.R on my Linux machine-- very cool! My >>>>> Mac is >>>>> having some sort of issue w/creating /tmp/crunch* directories that I >>>>> need >>>>> to sort out. >>>>> >>>>> In the example you sent of the broken chaining of DoFns, why didn't the >>>>> first line (quoted below) require a PType? >>>> >>>> >>>> Because the implementation assumes a default type which is character >>>> vector as below. Also, if it detects >>>> that key type was specified explicitly, it returns PTable automatically >>>> instead of PCollection. >>>> >>>> Further on, PTable's emits automatically assume emit(key,value) >>>> invocation for concise of notation (instead of java's Pair.of(key,value) ) >>>> and PCollections assume just emit(value). >>>> >>>> parallelDo = function ( FUN_PROCESS, >>>> FUN_INITIALIZE=NULL,FUN_CLEANUP=NULL, >>>> valueType=crunchR.RStrings$new(), keyType) { >>>> if (missing(keyType)) { >>>> >>>> .parallelDo.PCollection(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,valueType) >>>> } else { >>>> >>>> .parallelDo.PTable(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,keyType,valueType) >>>> } >>>> }, >>>> >>>> >>>> >>>> Is there a shortcut for the case >>>>> when the PType of the child is the same as the PType of the parent? >>>>> >>>> >>>> er... no. it kind of always assume RStrings (which assumes >>>> PType<String> but corresponding R type is multi-emit, i.e. you can emit a >>>> vector once and internally it will translate into bunch of calls of >>>> emit(String). This is a notion that i made specifically for R since R >>>> operates with vectors and lists, so i can emit just one vector type and >>>> declare it a multi-emit. It is not clear to me if this notion will have a >>>> benefit. Obviously, you still can emit R character vector as a single >>>> value, too, but you would have to select different RType thing there to >>>> imply your intent. >>>> >>>> Word count is a good example where multi-emit RType serves you well: >>>> you output result of split[[1]] which is a character vector as one R call >>>> emit(split...) but it translates into bunch of individual emits (the >>>> variant i had before this last one with PTable, or the one commented one >>>> here : >>>> >>>> # wordsPCol <- inputPCol$parallelDo( >>>> > # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >>>> > # ) >>>> >>>> >>>> >>>>> >>>>> # wordsPCol <- inputPCol$parallelDo( >>>>> # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >>>>> # ) >>>>> >>>>> Josh >>>>> >>>>> >>>>> >>>>> On Fri, Nov 23, 2012 at 1:59 PM, Dmitriy Lyubimov <[email protected]> >>>>> wrote: >>>>> >>>>> > ok support for PTable emission (key,value) pairs work in the latest >>>>> commit. >>>>> > >>>>> > My current problem is that composition of doFunctions doesn't work, >>>>> > probably because of the sequence of cleanup() calls. I have to >>>>> figure out: >>>>> > >>>>> > ============= >>>>> > this composition of 2 functions (PCollection, PTable) is a problem >>>>> > >>>>> > # wordsPCol <- inputPCol$parallelDo( >>>>> > # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >>>>> > # ) >>>>> > # >>>>> > # wordsPTab <- wordsPCol$parallelDo(function(word) emit(word,1), >>>>> > # keyType = crunchR.RString$new(), >>>>> > # valueType = crunchR.RUint32$new()) >>>>> > >>>>> > but this equivalent works: >>>>> > wordsPTab <- inputPCol$parallelDo( >>>>> > function(line) { >>>>> > words<- strsplit(tolower(line),"[^[:alnum:]]+")[[1]] >>>>> > sapply(words, function(x) emit(x,1)) >>>>> > }, >>>>> > keyType = crunchR.RString$new(), >>>>> > valueType = crunchR.RUint32$new() >>>>> > ) >>>>> > >>>>> > >>>>> > >>>>> > On Thu, Nov 22, 2012 at 2:13 PM, Dmitriy Lyubimov <[email protected] >>>>> > >>>>> > wrote: >>>>> > >>>>> > > Ok , I guess i am going to work on the next milestone which is >>>>> > PTableType >>>>> > > serialization support between R and java sides. >>>>> > > >>>>> > > once i am done with that, i guess i will be able to add other api >>>>> and >>>>> > > complete word count example fairly easily. >>>>> > > >>>>> > > Example1.R in its current state works. >>>>> > > >>>>> > > >>>>> > > On Wed, Nov 21, 2012 at 12:11 PM, Josh Wills <[email protected]> >>>>> > wrote: >>>>> > > >>>>> > >> I'm going to play with this again over the break-- BTW, did you >>>>> see >>>>> > >> Renjin? >>>>> > >> I somehow missed this, but it looks interesting. >>>>> > >> >>>>> > >> http://code.google.com/p/renjin/ >>>>> > >> >>>>> > >> >>>>> > >> On Sun, Nov 18, 2012 at 11:44 AM, Dmitriy Lyubimov < >>>>> [email protected] >>>>> > >> >wrote: >>>>> > >> >>>>> > >> > On Sun, Nov 18, 2012 at 9:37 AM, Josh Wills < >>>>> [email protected]> >>>>> > >> wrote: >>>>> > >> > >>>>> > >> > > Dmitrity, >>>>> > >> > > >>>>> > >> > > Just sent you a pull request based on playing with the code >>>>> on OS X. >>>>> > >> It >>>>> > >> > > contains a README about my experience getting things working. >>>>> > >> > > >>>>> > >> > >>>>> > >> > Are you sure it is doxygen package? I thought it was roxygen2 >>>>> package? >>>>> > >> > >>>>> > >> > Actually there seems currently no best practice in existence >>>>> for R5 >>>>> > >> classes >>>>> > >> > + roxygen2 (and the guy ignores @import order of files, too). >>>>> Hence >>>>> > the >>>>> > >> > hacks with file names. >>>>> > >> > >>>>> > >> > >>>>> > >> > > Unfortunately, I haven't succeeded in getting crunchR loaded, >>>>> I'm >>>>> > >> running >>>>> > >> > > into some issues w/RProtoBuf on OS X. I'll give it another go >>>>> this >>>>> > >> week >>>>> > >> > on >>>>> > >> > > my Linux machine at work. >>>>> > >> > > >>>>> > >> > ok i removed @import RProtoBuf, you should be able to install >>>>> w/o it. >>>>> > >> Maven >>>>> > >> > still compiles protoc stuff though. >>>>> > >> > >>>>> > >> > > >>>>> > >> > > J >>>>> > >> > > >>>>> > >> > > >>>>> > >> > > On Sat, Nov 17, 2012 at 12:49 PM, Dmitriy Lyubimov < >>>>> > [email protected] >>>>> > >> > > >wrote: >>>>> > >> > > >>>>> > >> > > > Josh, >>>>> > >> > > > >>>>> > >> > > > ok the following commit >>>>> > >> > > > >>>>> > >> > > > ============== >>>>> > >> > > > commit 67605360838f810fa5ddf99abb3ef2962d3f05e3 >>>>> > >> > > > Author: Dmitriy Lyubimov <[email protected]> >>>>> > >> > > > Date: Sat Nov 17 12:29:27 2012 -0800 >>>>> > >> > > > >>>>> > >> > > > example1 succeeds >>>>> > >> > > > >>>>> > >> > > > ==================== >>>>> > >> > > > >>>>> > >> > > > runs example 1 for me successfully in a fully distributed >>>>> way >>>>> > which >>>>> > >> is >>>>> > >> > > > first step (map-only thing) for the word count. >>>>> > >> > > > >>>>> > >> > > > (I think there's a hickup somewhere here because in the >>>>> output i >>>>> > >> also >>>>> > >> > > seem >>>>> > >> > > > to see some empty lines, so the strsplit() part is perhaps >>>>> set up >>>>> > >> > > somewhat >>>>> > >> > > > incorrectly here, but it's not the point right now): >>>>> > >> > > > >>>>> > >> > > > ====Example1.R=========== >>>>> > >> > > > >>>>> > >> > > > library(crunchR) >>>>> > >> > > > >>>>> > >> > > > pipeline <- crunchR.MRPipeline$new("test-pipeline") >>>>> > >> > > > >>>>> > >> > > > inputPCol <- >>>>> pipeline$readTextFile("/crunchr-examples/input") >>>>> > >> > > > >>>>> > >> > > > outputPCol <- inputPCol$parallelDo( >>>>> > >> > > > function(line) emit( >>>>> strsplit(tolower(line),"[^[:alnum:]]")[[1]] ) >>>>> > >> > > > ) >>>>> > >> > > > >>>>> > >> > > > outputPCol$writeTextFile("/crunchr-examples/output") >>>>> > >> > > > >>>>> > >> > > > result <- pipeline$run() >>>>> > >> > > > >>>>> > >> > > > if ( !result$succeeded() ) stop ("pipeline failed.") >>>>> > >> > > > >>>>> > >> > > > ======================================== >>>>> > >> > > > >>>>> > >> > > > I think R-java communication now should support multiple >>>>> doFn ok >>>>> > and >>>>> > >> > they >>>>> > >> > > > will be properly shut down and executed and synchronized >>>>> even if >>>>> > >> they >>>>> > >> > > emit >>>>> > >> > > > in the cleanup phase. >>>>> > >> > > > >>>>> > >> > > > This example assumes a lot of defaults (such as RTypes >>>>> which are >>>>> > by >>>>> > >> > > default >>>>> > >> > > > character vector singleton in and character vector out for a >>>>> > DoFn). >>>>> > >> > Also >>>>> > >> > > > obviously uses text in-text out at this point only. >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > To run, install the package and upload the test input >>>>> > (test-prep.sh) >>>>> > >> > > > Assuming you have compiled the maven part, the R package >>>>> snapshot >>>>> > >> could >>>>> > >> > > be >>>>> > >> > > > installed by running "install-snapshot-rpkg.sh". >>>>> > >> > > > >>>>> > >> > > > You also need to make sure your backend tasks see JRI >>>>> library. >>>>> > there >>>>> > >> > are >>>>> > >> > > > multiple ways to do it i guess but for the purposes of >>>>> testing the >>>>> > >> > > > following just works for me in my mapred-site: >>>>> > >> > > > >>>>> > >> > > > <property> >>>>> > >> > > > <name>mapred.child.java.opts</name> >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > >>>>> > >> > >>>>> > >> >>>>> > >>>>> >>>>> <value>-Djava.library.path=/home/dmitriy/R/x86_64-pc-linux-gnu-library/2/rJava/jri >>>>> > >> > > > </value> >>>>> > >> > > > <final>false</final> >>>>> > >> > > > </property> >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > I think at this point you guys might help me by doing >>>>> review of >>>>> > that >>>>> > >> > > stuff, >>>>> > >> > > > asking questions and making suggestions how to go by >>>>> incorporating >>>>> > >> > other >>>>> > >> > > > types of doFn and perhaps a way to complete the word count >>>>> > example, >>>>> > >> > > perhaps >>>>> > >> > > > running comparative benchmarks with a java-only word count, >>>>> how >>>>> > much >>>>> > >> > > > overhead we seem to be suffering here. >>>>> > >> > > > >>>>> > >> > > > I use StatEt in eclipse. Although it is a huge way forward, >>>>> the >>>>> > >> process >>>>> > >> > > is >>>>> > >> > > > still extremely tedious since I don't know unit testing >>>>> framework >>>>> > >> in R >>>>> > >> > > well >>>>> > >> > > > (so i just scribble some stuff on the side to unit-test >>>>> this and >>>>> > >> that) >>>>> > >> > > and >>>>> > >> > > > the integration test running cycle is significant enough. >>>>> > >> > > > >>>>> > >> > > > Which is why any help and suggestions are very welcome! >>>>> > >> > > > >>>>> > >> > > > I will definitely add support for reading/writing sequence >>>>> files >>>>> > and >>>>> > >> > > > Protobufs, as well as Mahout DRM's . >>>>> > >> > > > >>>>> > >> > > > >>>>> > >> > > > Thanks. >>>>> > >> > > > -Dmitrity >>>>> > >> > > > >>>>> > >> > > >>>>> > >> > >>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> -- >>>>> > >> Director of Data Science >>>>> > >> Cloudera <http://www.cloudera.com> >>>>> > >> Twitter: @josh_wills <http://twitter.com/josh_wills> >>>>> > >> >>>>> > > >>>>> > > >>>>> > >>>>> >>>> >>>> >>> >> >
