also one obvious optimization is that if we somehow could perform extraction of DoFn's DAG for a particular task, we could re-connect that DAG on the R side instead of piping that data back and forth from R to java DAG of doFn's. But i would need a help from somebody with deep inner knowledge of Crunch optimizer to extract and materialize such DAGs of functions on the R side.
On Sat, Nov 24, 2012 at 11:13 AM, Dmitriy Lyubimov <[email protected]>wrote: > Another perhaps useful piece of information is that process, initialize > and cleanup R closures may share the same environment and this is handled > corerctly at the backend, e.g. > > createClosures <- function () { > x <- 0, y<- 0 > startup <- function () x <<- 1 > process <- function(value) y <<- ifelse(x==1,2,0) > cleanup <- function() emit(x + y) > > list(process,startup,cleanup) > } > > this function will produce 3 closures that share same environment > containing x and y and each task at backend should emit value 3. > > > On Sat, Nov 24, 2012 at 10:54 AM, Dmitriy Lyubimov <[email protected]>wrote: > >> >> >> >> On Sat, Nov 24, 2012 at 10:29 AM, Josh Wills <[email protected]>wrote: >> >>> Hey Dmitriy, >>> >>> I'm up and running w/Example1.R on my Linux machine-- very cool! My Mac >>> is >>> having some sort of issue w/creating /tmp/crunch* directories that I need >>> to sort out. >>> >>> In the example you sent of the broken chaining of DoFns, why didn't the >>> first line (quoted below) require a PType? >> >> >> Because the implementation assumes a default type which is character >> vector as below. Also, if it detects >> that key type was specified explicitly, it returns PTable automatically >> instead of PCollection. >> >> Further on, PTable's emits automatically assume emit(key,value) >> invocation for concise of notation (instead of java's Pair.of(key,value) ) >> and PCollections assume just emit(value). >> >> parallelDo = function ( FUN_PROCESS, >> FUN_INITIALIZE=NULL,FUN_CLEANUP=NULL, >> valueType=crunchR.RStrings$new(), keyType) { >> if (missing(keyType)) { >> .parallelDo.PCollection(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,valueType) >> } else { >> >> .parallelDo.PTable(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,keyType,valueType) >> } >> }, >> >> >> >> Is there a shortcut for the case >>> when the PType of the child is the same as the PType of the parent? >>> >> >> er... no. it kind of always assume RStrings (which assumes PType<String> >> but corresponding R type is multi-emit, i.e. you can emit a vector once and >> internally it will translate into bunch of calls of emit(String). This is a >> notion that i made specifically for R since R operates with vectors and >> lists, so i can emit just one vector type and declare it a multi-emit. It >> is not clear to me if this notion will have a benefit. Obviously, you still >> can emit R character vector as a single value, too, but you would have to >> select different RType thing there to imply your intent. >> >> Word count is a good example where multi-emit RType serves you well: you >> output result of split[[1]] which is a character vector as one R call >> emit(split...) but it translates into bunch of individual emits (the >> variant i had before this last one with PTable, or the one commented one >> here : >> >> # wordsPCol <- inputPCol$parallelDo( >> > # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >> > # ) >> >> >> >>> >>> # wordsPCol <- inputPCol$parallelDo( >>> # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >>> # ) >>> >>> Josh >>> >>> >>> >>> On Fri, Nov 23, 2012 at 1:59 PM, Dmitriy Lyubimov <[email protected]> >>> wrote: >>> >>> > ok support for PTable emission (key,value) pairs work in the latest >>> commit. >>> > >>> > My current problem is that composition of doFunctions doesn't work, >>> > probably because of the sequence of cleanup() calls. I have to figure >>> out: >>> > >>> > ============= >>> > this composition of 2 functions (PCollection, PTable) is a problem >>> > >>> > # wordsPCol <- inputPCol$parallelDo( >>> > # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] ) >>> > # ) >>> > # >>> > # wordsPTab <- wordsPCol$parallelDo(function(word) emit(word,1), >>> > # keyType = crunchR.RString$new(), >>> > # valueType = crunchR.RUint32$new()) >>> > >>> > but this equivalent works: >>> > wordsPTab <- inputPCol$parallelDo( >>> > function(line) { >>> > words<- strsplit(tolower(line),"[^[:alnum:]]+")[[1]] >>> > sapply(words, function(x) emit(x,1)) >>> > }, >>> > keyType = crunchR.RString$new(), >>> > valueType = crunchR.RUint32$new() >>> > ) >>> > >>> > >>> > >>> > On Thu, Nov 22, 2012 at 2:13 PM, Dmitriy Lyubimov <[email protected]> >>> > wrote: >>> > >>> > > Ok , I guess i am going to work on the next milestone which is >>> > PTableType >>> > > serialization support between R and java sides. >>> > > >>> > > once i am done with that, i guess i will be able to add other api and >>> > > complete word count example fairly easily. >>> > > >>> > > Example1.R in its current state works. >>> > > >>> > > >>> > > On Wed, Nov 21, 2012 at 12:11 PM, Josh Wills <[email protected]> >>> > wrote: >>> > > >>> > >> I'm going to play with this again over the break-- BTW, did you see >>> > >> Renjin? >>> > >> I somehow missed this, but it looks interesting. >>> > >> >>> > >> http://code.google.com/p/renjin/ >>> > >> >>> > >> >>> > >> On Sun, Nov 18, 2012 at 11:44 AM, Dmitriy Lyubimov < >>> [email protected] >>> > >> >wrote: >>> > >> >>> > >> > On Sun, Nov 18, 2012 at 9:37 AM, Josh Wills <[email protected] >>> > >>> > >> wrote: >>> > >> > >>> > >> > > Dmitrity, >>> > >> > > >>> > >> > > Just sent you a pull request based on playing with the code on >>> OS X. >>> > >> It >>> > >> > > contains a README about my experience getting things working. >>> > >> > > >>> > >> > >>> > >> > Are you sure it is doxygen package? I thought it was roxygen2 >>> package? >>> > >> > >>> > >> > Actually there seems currently no best practice in existence for >>> R5 >>> > >> classes >>> > >> > + roxygen2 (and the guy ignores @import order of files, too). >>> Hence >>> > the >>> > >> > hacks with file names. >>> > >> > >>> > >> > >>> > >> > > Unfortunately, I haven't succeeded in getting crunchR loaded, >>> I'm >>> > >> running >>> > >> > > into some issues w/RProtoBuf on OS X. I'll give it another go >>> this >>> > >> week >>> > >> > on >>> > >> > > my Linux machine at work. >>> > >> > > >>> > >> > ok i removed @import RProtoBuf, you should be able to install w/o >>> it. >>> > >> Maven >>> > >> > still compiles protoc stuff though. >>> > >> > >>> > >> > > >>> > >> > > J >>> > >> > > >>> > >> > > >>> > >> > > On Sat, Nov 17, 2012 at 12:49 PM, Dmitriy Lyubimov < >>> > [email protected] >>> > >> > > >wrote: >>> > >> > > >>> > >> > > > Josh, >>> > >> > > > >>> > >> > > > ok the following commit >>> > >> > > > >>> > >> > > > ============== >>> > >> > > > commit 67605360838f810fa5ddf99abb3ef2962d3f05e3 >>> > >> > > > Author: Dmitriy Lyubimov <[email protected]> >>> > >> > > > Date: Sat Nov 17 12:29:27 2012 -0800 >>> > >> > > > >>> > >> > > > example1 succeeds >>> > >> > > > >>> > >> > > > ==================== >>> > >> > > > >>> > >> > > > runs example 1 for me successfully in a fully distributed way >>> > which >>> > >> is >>> > >> > > > first step (map-only thing) for the word count. >>> > >> > > > >>> > >> > > > (I think there's a hickup somewhere here because in the >>> output i >>> > >> also >>> > >> > > seem >>> > >> > > > to see some empty lines, so the strsplit() part is perhaps >>> set up >>> > >> > > somewhat >>> > >> > > > incorrectly here, but it's not the point right now): >>> > >> > > > >>> > >> > > > ====Example1.R=========== >>> > >> > > > >>> > >> > > > library(crunchR) >>> > >> > > > >>> > >> > > > pipeline <- crunchR.MRPipeline$new("test-pipeline") >>> > >> > > > >>> > >> > > > inputPCol <- pipeline$readTextFile("/crunchr-examples/input") >>> > >> > > > >>> > >> > > > outputPCol <- inputPCol$parallelDo( >>> > >> > > > function(line) emit( >>> strsplit(tolower(line),"[^[:alnum:]]")[[1]] ) >>> > >> > > > ) >>> > >> > > > >>> > >> > > > outputPCol$writeTextFile("/crunchr-examples/output") >>> > >> > > > >>> > >> > > > result <- pipeline$run() >>> > >> > > > >>> > >> > > > if ( !result$succeeded() ) stop ("pipeline failed.") >>> > >> > > > >>> > >> > > > ======================================== >>> > >> > > > >>> > >> > > > I think R-java communication now should support multiple doFn >>> ok >>> > and >>> > >> > they >>> > >> > > > will be properly shut down and executed and synchronized even >>> if >>> > >> they >>> > >> > > emit >>> > >> > > > in the cleanup phase. >>> > >> > > > >>> > >> > > > This example assumes a lot of defaults (such as RTypes which >>> are >>> > by >>> > >> > > default >>> > >> > > > character vector singleton in and character vector out for a >>> > DoFn). >>> > >> > Also >>> > >> > > > obviously uses text in-text out at this point only. >>> > >> > > > >>> > >> > > > >>> > >> > > > To run, install the package and upload the test input >>> > (test-prep.sh) >>> > >> > > > Assuming you have compiled the maven part, the R package >>> snapshot >>> > >> could >>> > >> > > be >>> > >> > > > installed by running "install-snapshot-rpkg.sh". >>> > >> > > > >>> > >> > > > You also need to make sure your backend tasks see JRI library. >>> > there >>> > >> > are >>> > >> > > > multiple ways to do it i guess but for the purposes of >>> testing the >>> > >> > > > following just works for me in my mapred-site: >>> > >> > > > >>> > >> > > > <property> >>> > >> > > > <name>mapred.child.java.opts</name> >>> > >> > > > >>> > >> > > > >>> > >> > > > >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>> <value>-Djava.library.path=/home/dmitriy/R/x86_64-pc-linux-gnu-library/2/rJava/jri >>> > >> > > > </value> >>> > >> > > > <final>false</final> >>> > >> > > > </property> >>> > >> > > > >>> > >> > > > >>> > >> > > > I think at this point you guys might help me by doing review >>> of >>> > that >>> > >> > > stuff, >>> > >> > > > asking questions and making suggestions how to go by >>> incorporating >>> > >> > other >>> > >> > > > types of doFn and perhaps a way to complete the word count >>> > example, >>> > >> > > perhaps >>> > >> > > > running comparative benchmarks with a java-only word count, >>> how >>> > much >>> > >> > > > overhead we seem to be suffering here. >>> > >> > > > >>> > >> > > > I use StatEt in eclipse. Although it is a huge way forward, >>> the >>> > >> process >>> > >> > > is >>> > >> > > > still extremely tedious since I don't know unit testing >>> framework >>> > >> in R >>> > >> > > well >>> > >> > > > (so i just scribble some stuff on the side to unit-test this >>> and >>> > >> that) >>> > >> > > and >>> > >> > > > the integration test running cycle is significant enough. >>> > >> > > > >>> > >> > > > Which is why any help and suggestions are very welcome! >>> > >> > > > >>> > >> > > > I will definitely add support for reading/writing sequence >>> files >>> > and >>> > >> > > > Protobufs, as well as Mahout DRM's . >>> > >> > > > >>> > >> > > > >>> > >> > > > Thanks. >>> > >> > > > -Dmitrity >>> > >> > > > >>> > >> > > >>> > >> > >>> > >> >>> > >> >>> > >> >>> > >> -- >>> > >> Director of Data Science >>> > >> Cloudera <http://www.cloudera.com> >>> > >> Twitter: @josh_wills <http://twitter.com/josh_wills> >>> > >> >>> > > >>> > > >>> > >>> >> >> >
