it looks like easy and naive solution might be to detect whenever
IntermediateEmitter is used in a function and serialize one to R side
instead of actually using it on Java side.

thoughts?


On Sat, Nov 24, 2012 at 5:05 PM, Dmitriy Lyubimov <[email protected]> wrote:

> Aha. I did a number of bug fixes so both examples (with 1 doFn and 2
> doFn's ) are working now. Good.
>
> Josh, please read the comment to the second example. The intermediate
> output of doFn #1 runs to java/Crunch and back just to be fed into doFn #2.
> I would very much like to short-circuit those things on R side. Otherwise
> it will be very hard to optimize multi-tenant applications (multiple
> decoupled models encapsulated into bunch of doFn's distributed and
> optimized by Crunch). Which is actually my pattern for production.
>
> I'd be eternally grateful if you could give it a thought. It may require
> some exposure of Crunch optimizer internals IMO.
>
> thank you, sir.
>
>
> On Sat, Nov 24, 2012 at 11:53 AM, Dmitriy Lyubimov <[email protected]>wrote:
>
>> also one obvious optimization is that if we somehow could perform
>> extraction of DoFn's DAG for a particular task, we could re-connect that
>> DAG on the R side instead of piping that data back and forth from R to java
>> DAG of doFn's. But i would need a help from somebody with deep inner
>> knowledge of Crunch optimizer to extract and materialize such DAGs of
>> functions on the R side.
>>
>>
>> On Sat, Nov 24, 2012 at 11:13 AM, Dmitriy Lyubimov <[email protected]>wrote:
>>
>>> Another perhaps useful piece of information is that process, initialize
>>> and cleanup R closures may share the same environment and this is handled
>>> corerctly at the backend, e.g.
>>>
>>> createClosures <- function () {
>>>    x <- 0, y<- 0
>>>   startup <- function () x <<- 1
>>>   process <- function(value) y <<- ifelse(x==1,2,0)
>>>   cleanup <- function() emit(x + y)
>>>
>>>   list(process,startup,cleanup)
>>> }
>>>
>>> this function will produce 3 closures that share same environment
>>> containing x and y and each task at backend should emit value 3.
>>>
>>>
>>> On Sat, Nov 24, 2012 at 10:54 AM, Dmitriy Lyubimov <[email protected]>wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Sat, Nov 24, 2012 at 10:29 AM, Josh Wills <[email protected]>wrote:
>>>>
>>>>> Hey Dmitriy,
>>>>>
>>>>> I'm up and running w/Example1.R on my Linux machine-- very cool! My
>>>>> Mac is
>>>>> having some sort of issue w/creating /tmp/crunch* directories that I
>>>>> need
>>>>> to sort out.
>>>>>
>>>>> In the example you sent of the broken chaining of DoFns, why didn't the
>>>>> first line (quoted below) require a PType?
>>>>
>>>>
>>>> Because the implementation assumes a default  type which is character
>>>> vector as below. Also, if it detects
>>>> that key type was specified explicitly, it returns PTable automatically
>>>> instead of PCollection.
>>>>
>>>> Further on, PTable's emits automatically assume emit(key,value)
>>>> invocation for concise of notation (instead of java's Pair.of(key,value) )
>>>> and PCollections assume just emit(value).
>>>>
>>>>  parallelDo = function ( FUN_PROCESS,
>>>> FUN_INITIALIZE=NULL,FUN_CLEANUP=NULL,
>>>> valueType=crunchR.RStrings$new(), keyType) {
>>>>  if (missing(keyType)) {
>>>>
>>>> .parallelDo.PCollection(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,valueType)
>>>>  } else {
>>>>
>>>> .parallelDo.PTable(FUN_PROCESS,FUN_INITIALIZE,FUN_CLEANUP,keyType,valueType)
>>>> }
>>>>  },
>>>>
>>>>
>>>>
>>>> Is there a shortcut for the case
>>>>> when the PType of the child is the same as the PType of the parent?
>>>>>
>>>>
>>>> er... no. it kind of always assume RStrings (which assumes
>>>> PType<String> but corresponding R type is multi-emit, i.e. you can emit a
>>>> vector once and internally it will translate into bunch of calls of
>>>> emit(String). This is a notion that i made specifically for R since R
>>>> operates with vectors and lists, so i can emit just one vector type and
>>>> declare it a multi-emit. It is not clear to me if this notion will have a
>>>> benefit. Obviously, you still can emit R character vector as a single
>>>> value, too, but you would have to select different RType thing there to
>>>> imply your intent.
>>>>
>>>> Word count is a good example where multi-emit RType serves you well:
>>>> you output result of split[[1]] which is a character vector as one R call
>>>> emit(split...) but it translates into bunch of individual emits (the
>>>> variant i had before this last one with PTable, or the one commented one
>>>> here :
>>>>
>>>> # wordsPCol <- inputPCol$parallelDo(
>>>> > # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] )
>>>> > # )
>>>>
>>>>
>>>>
>>>>>
>>>>> # wordsPCol <- inputPCol$parallelDo(
>>>>> # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] )
>>>>> # )
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 23, 2012 at 1:59 PM, Dmitriy Lyubimov <[email protected]>
>>>>> wrote:
>>>>>
>>>>> > ok support for PTable emission (key,value) pairs work in the latest
>>>>> commit.
>>>>> >
>>>>> > My current problem is that composition of doFunctions doesn't work,
>>>>> > probably because of the sequence of cleanup() calls. I have to
>>>>> figure out:
>>>>> >
>>>>> > =============
>>>>> > this composition of 2 functions (PCollection, PTable) is a problem
>>>>> >
>>>>> > # wordsPCol <- inputPCol$parallelDo(
>>>>> > # function(line) emit( strsplit(tolower(line),"[^[:alnum:]]+")[[1]] )
>>>>> > # )
>>>>> > #
>>>>> > # wordsPTab <- wordsPCol$parallelDo(function(word) emit(word,1),
>>>>> > # keyType = crunchR.RString$new(),
>>>>> > # valueType = crunchR.RUint32$new())
>>>>> >
>>>>> > but this equivalent works:
>>>>> > wordsPTab <- inputPCol$parallelDo(
>>>>> > function(line) {
>>>>> > words<- strsplit(tolower(line),"[^[:alnum:]]+")[[1]]
>>>>> > sapply(words, function(x) emit(x,1))
>>>>> > },
>>>>> > keyType = crunchR.RString$new(),
>>>>> > valueType = crunchR.RUint32$new()
>>>>> > )
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Thu, Nov 22, 2012 at 2:13 PM, Dmitriy Lyubimov <[email protected]
>>>>> >
>>>>> > wrote:
>>>>> >
>>>>> > > Ok ,  I guess i am going to work on the next milestone which is
>>>>> > PTableType
>>>>> > > serialization support between R and java sides.
>>>>> > >
>>>>> > > once i am done with that, i guess i will be able to add other api
>>>>> and
>>>>> > > complete word count example fairly easily.
>>>>> > >
>>>>> > > Example1.R in its current state works.
>>>>> > >
>>>>> > >
>>>>> > > On Wed, Nov 21, 2012 at 12:11 PM, Josh Wills <[email protected]>
>>>>> > wrote:
>>>>> > >
>>>>> > >> I'm going to play with this again over the break-- BTW, did you
>>>>> see
>>>>> > >> Renjin?
>>>>> > >> I somehow missed this, but it looks interesting.
>>>>> > >>
>>>>> > >> http://code.google.com/p/renjin/
>>>>> > >>
>>>>> > >>
>>>>> > >> On Sun, Nov 18, 2012 at 11:44 AM, Dmitriy Lyubimov <
>>>>> [email protected]
>>>>> > >> >wrote:
>>>>> > >>
>>>>> > >> > On Sun, Nov 18, 2012 at 9:37 AM, Josh Wills <
>>>>> [email protected]>
>>>>> > >> wrote:
>>>>> > >> >
>>>>> > >> > > Dmitrity,
>>>>> > >> > >
>>>>> > >> > > Just sent you a pull request based on playing with the code
>>>>> on OS X.
>>>>> > >> It
>>>>> > >> > > contains a README about my experience getting things working.
>>>>> > >> > >
>>>>> > >> >
>>>>> > >> > Are you sure it is doxygen package? I thought it was roxygen2
>>>>> package?
>>>>> > >> >
>>>>> > >> > Actually there seems currently no best practice in existence
>>>>> for R5
>>>>> > >> classes
>>>>> > >> > + roxygen2 (and the guy ignores @import order of files, too).
>>>>> Hence
>>>>> > the
>>>>> > >> > hacks with file names.
>>>>> > >> >
>>>>> > >> >
>>>>> > >> > > Unfortunately, I haven't succeeded in getting crunchR loaded,
>>>>> I'm
>>>>> > >> running
>>>>> > >> > > into some issues w/RProtoBuf on OS X. I'll give it another go
>>>>> this
>>>>> > >> week
>>>>> > >> > on
>>>>> > >> > > my Linux machine at work.
>>>>> > >> > >
>>>>> > >> > ok i removed @import RProtoBuf, you should be able to install
>>>>> w/o it.
>>>>> > >> Maven
>>>>> > >> > still compiles protoc  stuff though.
>>>>> > >> >
>>>>> > >> > >
>>>>> > >> > > J
>>>>> > >> > >
>>>>> > >> > >
>>>>> > >> > > On Sat, Nov 17, 2012 at 12:49 PM, Dmitriy Lyubimov <
>>>>> > [email protected]
>>>>> > >> > > >wrote:
>>>>> > >> > >
>>>>> > >> > > > Josh,
>>>>> > >> > > >
>>>>> > >> > > > ok the following commit
>>>>> > >> > > >
>>>>> > >> > > > ==============
>>>>> > >> > > > commit 67605360838f810fa5ddf99abb3ef2962d3f05e3
>>>>> > >> > > > Author: Dmitriy Lyubimov <[email protected]>
>>>>> > >> > > > Date:   Sat Nov 17 12:29:27 2012 -0800
>>>>> > >> > > >
>>>>> > >> > > >     example1 succeeds
>>>>> > >> > > >
>>>>> > >> > > > ====================
>>>>> > >> > > >
>>>>> > >> > > > runs example 1 for me successfully in a fully distributed
>>>>> way
>>>>> > which
>>>>> > >> is
>>>>> > >> > > > first step (map-only thing) for the word count.
>>>>> > >> > > >
>>>>> > >> > > > (I think there's a hickup somewhere here because in the
>>>>> output i
>>>>> > >> also
>>>>> > >> > > seem
>>>>> > >> > > > to see some empty lines, so the strsplit() part is perhaps
>>>>> set up
>>>>> > >> > > somewhat
>>>>> > >> > > > incorrectly here, but it's not the point right now):
>>>>> > >> > > >
>>>>> > >> > > > ====Example1.R===========
>>>>> > >> > > >
>>>>> > >> > > > library(crunchR)
>>>>> > >> > > >
>>>>> > >> > > > pipeline <- crunchR.MRPipeline$new("test-pipeline")
>>>>> > >> > > >
>>>>> > >> > > > inputPCol <-
>>>>> pipeline$readTextFile("/crunchr-examples/input")
>>>>> > >> > > >
>>>>> > >> > > > outputPCol <- inputPCol$parallelDo(
>>>>> > >> > > > function(line) emit(
>>>>> strsplit(tolower(line),"[^[:alnum:]]")[[1]] )
>>>>> > >> > > > )
>>>>> > >> > > >
>>>>> > >> > > > outputPCol$writeTextFile("/crunchr-examples/output")
>>>>> > >> > > >
>>>>> > >> > > > result <- pipeline$run()
>>>>> > >> > > >
>>>>> > >> > > > if ( !result$succeeded() ) stop ("pipeline failed.")
>>>>> > >> > > >
>>>>> > >> > > > ========================================
>>>>> > >> > > >
>>>>> > >> > > > I think R-java communication now should support multiple
>>>>> doFn ok
>>>>> > and
>>>>> > >> > they
>>>>> > >> > > > will be properly shut down and executed and synchronized
>>>>> even if
>>>>> > >> they
>>>>> > >> > > emit
>>>>> > >> > > > in the cleanup phase.
>>>>> > >> > > >
>>>>> > >> > > > This example assumes a lot of defaults (such as RTypes
>>>>> which are
>>>>> > by
>>>>> > >> > > default
>>>>> > >> > > > character vector singleton in and character vector out for a
>>>>> > DoFn).
>>>>> > >> > Also
>>>>> > >> > > > obviously uses text in-text out at this point only.
>>>>> > >> > > >
>>>>> > >> > > >
>>>>> > >> > > > To run, install the package and upload the test input
>>>>> > (test-prep.sh)
>>>>> > >> > > > Assuming you have compiled the maven part, the R package
>>>>> snapshot
>>>>> > >> could
>>>>> > >> > > be
>>>>> > >> > > > installed by running "install-snapshot-rpkg.sh".
>>>>> > >> > > >
>>>>> > >> > > > You also need to make sure your backend tasks see JRI
>>>>> library.
>>>>> > there
>>>>> > >> > are
>>>>> > >> > > > multiple ways to do it i guess but for the purposes of
>>>>> testing the
>>>>> > >> > > > following just works for me in my mapred-site:
>>>>> > >> > > >
>>>>> > >> > > > <property>
>>>>> > >> > > >    <name>mapred.child.java.opts</name>
>>>>> > >> > > >
>>>>> > >> > > >
>>>>> > >> > > >
>>>>> > >> > >
>>>>> > >> >
>>>>> > >>
>>>>> >
>>>>>  
>>>>> <value>-Djava.library.path=/home/dmitriy/R/x86_64-pc-linux-gnu-library/2/rJava/jri
>>>>> > >> > > > </value>
>>>>> > >> > > >    <final>false</final>
>>>>> > >> > > > </property>
>>>>> > >> > > >
>>>>> > >> > > >
>>>>> > >> > > > I think at this point you guys might help me by doing
>>>>> review of
>>>>> > that
>>>>> > >> > > stuff,
>>>>> > >> > > > asking questions and making suggestions how to go by
>>>>> incorporating
>>>>> > >> > other
>>>>> > >> > > > types of doFn and perhaps a way to complete the word count
>>>>> > example,
>>>>> > >> > > perhaps
>>>>> > >> > > > running comparative benchmarks with a java-only word count,
>>>>> how
>>>>> > much
>>>>> > >> > > > overhead we seem to be suffering here.
>>>>> > >> > > >
>>>>> > >> > > > I use StatEt in eclipse. Although it is a huge way forward,
>>>>> the
>>>>> > >> process
>>>>> > >> > > is
>>>>> > >> > > > still extremely tedious since I don't know unit testing
>>>>> framework
>>>>> > >> in R
>>>>> > >> > > well
>>>>> > >> > > > (so i just scribble some stuff on the side to unit-test
>>>>> this and
>>>>> > >> that)
>>>>> > >> > > and
>>>>> > >> > > > the integration test running cycle is significant enough.
>>>>> > >> > > >
>>>>> > >> > > > Which is why any help and suggestions are very welcome!
>>>>> > >> > > >
>>>>> > >> > > > I will definitely add support for reading/writing sequence
>>>>> files
>>>>> > and
>>>>> > >> > > > Protobufs, as well as Mahout DRM's .
>>>>> > >> > > >
>>>>> > >> > > >
>>>>> > >> > > > Thanks.
>>>>> > >> > > > -Dmitrity
>>>>> > >> > > >
>>>>> > >> > >
>>>>> > >> >
>>>>> > >>
>>>>> > >>
>>>>> > >>
>>>>> > >> --
>>>>> > >> Director of Data Science
>>>>> > >> Cloudera <http://www.cloudera.com>
>>>>> > >> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>> > >>
>>>>> > >
>>>>> > >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to